001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
024
025import java.io.File;
026import java.io.IOException;
027import java.net.BindException;
028import java.net.URI;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.List;
032import java.util.Properties;
033import java.util.concurrent.ExecutionException;
034import org.apache.commons.io.FileUtils;
035import org.apache.commons.lang3.StringUtils;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.crypto.CipherSuite;
038import org.apache.hadoop.crypto.key.KeyProvider;
039import org.apache.hadoop.crypto.key.KeyProviderFactory;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
043import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
044import org.apache.hadoop.hbase.security.SecurityConstants;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.testclassification.MiscTests;
047import org.apache.hadoop.hdfs.DistributedFileSystem;
048import org.apache.hadoop.minikdc.MiniKdc;
049import org.apache.hadoop.security.UserGroupInformation;
050import org.junit.After;
051import org.junit.AfterClass;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.junit.runner.RunWith;
060import org.junit.runners.Parameterized;
061import org.junit.runners.Parameterized.Parameter;
062import org.junit.runners.Parameterized.Parameters;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.io.netty.channel.Channel;
067import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
068import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
069import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
070import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
071
072@RunWith(Parameterized.class)
073@Category({ MiscTests.class, LargeTests.class })
074public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
075
076  private static final Logger LOG =
077    LoggerFactory.getLogger(TestSaslFanOutOneBlockAsyncDFSOutput.class);
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class);
082
083  private static DistributedFileSystem FS;
084
085  private static EventLoopGroup EVENT_LOOP_GROUP;
086
087  private static Class<? extends Channel> CHANNEL_CLASS;
088
089  private static int READ_TIMEOUT_MS = 200000;
090
091  private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
092
093  private static MiniKdc KDC;
094
095  private static String HOST = "localhost";
096
097  private static String USERNAME;
098
099  private static String PRINCIPAL;
100
101  private static String HTTP_PRINCIPAL;
102
103  private static String TEST_KEY_NAME = "test_key";
104
105  private static StreamSlowMonitor MONITOR;
106
107  @Rule
108  public TestName name = new TestName();
109
110  @Parameter(0)
111  public String protection;
112
113  @Parameter(1)
114  public String encryptionAlgorithm;
115
116  @Parameter(2)
117  public String cipherSuite;
118
119  @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}")
120  public static Iterable<Object[]> data() {
121    List<Object[]> params = new ArrayList<>();
122    for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
123      for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
124        for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
125          params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite });
126        }
127      }
128    }
129    return params;
130  }
131
132  private static void setUpKeyProvider(Configuration conf) throws Exception {
133    URI keyProviderUri =
134      new URI("jceks://file" + UTIL.getDataTestDir("test.jks").toUri().toString());
135    conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
136    KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
137    keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
138    keyProvider.flush();
139    keyProvider.close();
140  }
141
142  /**
143   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
144   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
145   */
146  private static MiniKdc setupMiniKdc(File keytabFile) throws Exception {
147    Properties conf = MiniKdc.createConf();
148    conf.put(MiniKdc.DEBUG, true);
149    MiniKdc kdc = null;
150    File dir = null;
151    // There is time lag between selecting a port and trying to bind with it. It's possible that
152    // another service captures the port in between which'll result in BindException.
153    boolean bindException;
154    int numTries = 0;
155    do {
156      try {
157        bindException = false;
158        dir = new File(UTIL.getDataTestDir("kdc").toUri().getPath());
159        kdc = new MiniKdc(conf, dir);
160        kdc.start();
161      } catch (BindException e) {
162        FileUtils.deleteDirectory(dir); // clean directory
163        numTries++;
164        if (numTries == 3) {
165          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
166          throw e;
167        }
168        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
169        bindException = true;
170      }
171    } while (bindException);
172    System.setProperty(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
173      keytabFile.getAbsolutePath());
174    return kdc;
175  }
176
177  @BeforeClass
178  public static void setUpBeforeClass() throws Exception {
179    EVENT_LOOP_GROUP = new NioEventLoopGroup();
180    CHANNEL_CLASS = NioSocketChannel.class;
181    UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
182    KDC = setupMiniKdc(KEYTAB_FILE);
183    USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
184    PRINCIPAL = USERNAME + "/" + HOST;
185    HTTP_PRINCIPAL = "HTTP/" + HOST;
186    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
187
188    setUpKeyProvider(UTIL.getConfiguration());
189    HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(),
190      PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
191    HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
192    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
193  }
194
195  @AfterClass
196  public static void tearDownAfterClass() throws Exception {
197    if (EVENT_LOOP_GROUP != null) {
198      EVENT_LOOP_GROUP.shutdownGracefully().get();
199    }
200    if (KDC != null) {
201      KDC.stop();
202    }
203    shutdownMiniDFSCluster();
204  }
205
206  private Path testDirOnTestFs;
207
208  private Path entryptionTestDirOnTestFs;
209
210  private void createEncryptionZone() throws Exception {
211    FS.createEncryptionZone(entryptionTestDirOnTestFs, TEST_KEY_NAME);
212  }
213
214  @Before
215  public void setUp() throws Exception {
216    UTIL.getConfiguration().set("dfs.data.transfer.protection", protection);
217    if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) {
218      UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
219    } else {
220      UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
221    }
222    if (StringUtils.isBlank(encryptionAlgorithm)) {
223      UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
224    } else {
225      UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm);
226    }
227    if (StringUtils.isBlank(cipherSuite)) {
228      UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
229    } else {
230      UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
231    }
232
233    startMiniDFSCluster(3);
234    FS = CLUSTER.getFileSystem();
235    testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
236    FS.mkdirs(testDirOnTestFs);
237    entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc");
238    FS.mkdirs(entryptionTestDirOnTestFs);
239    createEncryptionZone();
240  }
241
242  @After
243  public void tearDown() throws IOException {
244    shutdownMiniDFSCluster();
245  }
246
247  private Path getTestFile() {
248    return new Path(testDirOnTestFs, "test");
249  }
250
251  private Path getEncryptionTestFile() {
252    return new Path(entryptionTestDirOnTestFs, "test");
253  }
254
255  private void test(Path file) throws IOException, InterruptedException, ExecutionException {
256    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
257    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
258      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
259    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
260  }
261
262  @Test
263  public void test() throws IOException, InterruptedException, ExecutionException {
264    test(getTestFile());
265    test(getEncryptionTestFile());
266  }
267}