001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
024
025import java.io.File;
026import java.io.IOException;
027import java.lang.reflect.Method;
028import java.net.BindException;
029import java.net.URI;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Properties;
034import java.util.concurrent.ExecutionException;
035import org.apache.commons.io.FileUtils;
036import org.apache.commons.lang3.StringUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.crypto.CipherSuite;
039import org.apache.hadoop.crypto.key.KeyProvider;
040import org.apache.hadoop.crypto.key.KeyProviderFactory;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
044import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
045import org.apache.hadoop.hbase.security.SecurityConstants;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.MiscTests;
048import org.apache.hadoop.hdfs.DistributedFileSystem;
049import org.apache.hadoop.minikdc.MiniKdc;
050import org.apache.hadoop.security.UserGroupInformation;
051import org.junit.After;
052import org.junit.AfterClass;
053import org.junit.Before;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Rule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.rules.TestName;
060import org.junit.runner.RunWith;
061import org.junit.runners.Parameterized;
062import org.junit.runners.Parameterized.Parameter;
063import org.junit.runners.Parameterized.Parameters;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.io.netty.channel.Channel;
068import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
069import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
070import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
071import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
072
073@RunWith(Parameterized.class)
074@Category({ MiscTests.class, LargeTests.class })
075public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
076
077  private static final Logger LOG =
078    LoggerFactory.getLogger(TestSaslFanOutOneBlockAsyncDFSOutput.class);
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082    HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class);
083
084  private static DistributedFileSystem FS;
085
086  private static EventLoopGroup EVENT_LOOP_GROUP;
087
088  private static Class<? extends Channel> CHANNEL_CLASS;
089
090  private static int READ_TIMEOUT_MS = 200000;
091
092  private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
093
094  private static MiniKdc KDC;
095
096  private static String HOST = "localhost";
097
098  private static String USERNAME;
099
100  private static String PRINCIPAL;
101
102  private static String HTTP_PRINCIPAL;
103
104  private static String TEST_KEY_NAME = "test_key";
105
106  private static StreamSlowMonitor MONITOR;
107
108  @Rule
109  public TestName name = new TestName();
110
111  @Parameter(0)
112  public String protection;
113
114  @Parameter(1)
115  public String encryptionAlgorithm;
116
117  @Parameter(2)
118  public String cipherSuite;
119
120  @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}")
121  public static Iterable<Object[]> data() {
122    List<Object[]> params = new ArrayList<>();
123    for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
124      for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
125        for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
126          params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite });
127        }
128      }
129    }
130    return params;
131  }
132
133  private static void setUpKeyProvider(Configuration conf) throws Exception {
134    URI keyProviderUri =
135      new URI("jceks://file" + UTIL.getDataTestDir("test.jks").toUri().toString());
136    conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
137    KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
138    keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
139    keyProvider.flush();
140    keyProvider.close();
141  }
142
143  /**
144   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
145   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
146   */
147  private static MiniKdc setupMiniKdc(File keytabFile) throws Exception {
148    Properties conf = MiniKdc.createConf();
149    conf.put(MiniKdc.DEBUG, true);
150    MiniKdc kdc = null;
151    File dir = null;
152    // There is time lag between selecting a port and trying to bind with it. It's possible that
153    // another service captures the port in between which'll result in BindException.
154    boolean bindException;
155    int numTries = 0;
156    do {
157      try {
158        bindException = false;
159        dir = new File(UTIL.getDataTestDir("kdc").toUri().getPath());
160        kdc = new MiniKdc(conf, dir);
161        kdc.start();
162      } catch (BindException e) {
163        FileUtils.deleteDirectory(dir); // clean directory
164        numTries++;
165        if (numTries == 3) {
166          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
167          throw e;
168        }
169        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
170        bindException = true;
171      }
172    } while (bindException);
173    System.setProperty(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
174      keytabFile.getAbsolutePath());
175    return kdc;
176  }
177
178  @BeforeClass
179  public static void setUpBeforeClass() throws Exception {
180    EVENT_LOOP_GROUP = new NioEventLoopGroup();
181    CHANNEL_CLASS = NioSocketChannel.class;
182    UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
183    KDC = setupMiniKdc(KEYTAB_FILE);
184    USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
185    PRINCIPAL = USERNAME + "/" + HOST;
186    HTTP_PRINCIPAL = "HTTP/" + HOST;
187    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
188
189    setUpKeyProvider(UTIL.getConfiguration());
190    HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(),
191      PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
192    HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
193    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
194  }
195
196  @AfterClass
197  public static void tearDownAfterClass() throws Exception {
198    if (EVENT_LOOP_GROUP != null) {
199      EVENT_LOOP_GROUP.shutdownGracefully().get();
200    }
201    if (KDC != null) {
202      KDC.stop();
203    }
204    shutdownMiniDFSCluster();
205  }
206
207  private Path testDirOnTestFs;
208
209  private Path entryptionTestDirOnTestFs;
210
211  private void createEncryptionZone() throws Exception {
212    Method method =
213      DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class);
214    method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME);
215  }
216
217  @Before
218  public void setUp() throws Exception {
219    UTIL.getConfiguration().set("dfs.data.transfer.protection", protection);
220    if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) {
221      UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
222    } else {
223      UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
224    }
225    if (StringUtils.isBlank(encryptionAlgorithm)) {
226      UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
227    } else {
228      UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm);
229    }
230    if (StringUtils.isBlank(cipherSuite)) {
231      UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
232    } else {
233      UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
234    }
235
236    startMiniDFSCluster(3);
237    FS = CLUSTER.getFileSystem();
238    testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
239    FS.mkdirs(testDirOnTestFs);
240    entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc");
241    FS.mkdirs(entryptionTestDirOnTestFs);
242    createEncryptionZone();
243  }
244
245  @After
246  public void tearDown() throws IOException {
247    shutdownMiniDFSCluster();
248  }
249
250  private Path getTestFile() {
251    return new Path(testDirOnTestFs, "test");
252  }
253
254  private Path getEncryptionTestFile() {
255    return new Path(entryptionTestDirOnTestFs, "test");
256  }
257
258  private void test(Path file) throws IOException, InterruptedException, ExecutionException {
259    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
260    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
261      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
262    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
263  }
264
265  @Test
266  public void test() throws IOException, InterruptedException, ExecutionException {
267    test(getTestFile());
268    test(getEncryptionTestFile());
269  }
270}