001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
024
025import java.io.File;
026import java.io.IOException;
027import java.lang.reflect.Method;
028import java.net.BindException;
029import java.net.URI;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Properties;
034import java.util.concurrent.ExecutionException;
035import org.apache.commons.io.FileUtils;
036import org.apache.commons.lang3.StringUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.crypto.CipherSuite;
039import org.apache.hadoop.crypto.key.KeyProvider;
040import org.apache.hadoop.crypto.key.KeyProviderFactory;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
044import org.apache.hadoop.hbase.security.SecurityConstants;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.apache.hadoop.hbase.testclassification.MiscTests;
047import org.apache.hadoop.hdfs.DistributedFileSystem;
048import org.apache.hadoop.minikdc.MiniKdc;
049import org.apache.hadoop.security.UserGroupInformation;
050import org.junit.After;
051import org.junit.AfterClass;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.junit.runner.RunWith;
060import org.junit.runners.Parameterized;
061import org.junit.runners.Parameterized.Parameter;
062import org.junit.runners.Parameterized.Parameters;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.io.netty.channel.Channel;
067import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
068import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
069import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
070import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
071
072@RunWith(Parameterized.class)
073@Category({ MiscTests.class, LargeTests.class })
074public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
075
076  private static final Logger LOG =
077    LoggerFactory.getLogger(TestSaslFanOutOneBlockAsyncDFSOutput.class);
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class);
082
083  private static DistributedFileSystem FS;
084
085  private static EventLoopGroup EVENT_LOOP_GROUP;
086
087  private static Class<? extends Channel> CHANNEL_CLASS;
088
089  private static int READ_TIMEOUT_MS = 200000;
090
091  private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
092
093  private static MiniKdc KDC;
094
095  private static String HOST = "localhost";
096
097  private static String USERNAME;
098
099  private static String PRINCIPAL;
100
101  private static String HTTP_PRINCIPAL;
102
103  private static String TEST_KEY_NAME = "test_key";
104
105  @Rule
106  public TestName name = new TestName();
107
108  @Parameter(0)
109  public String protection;
110
111  @Parameter(1)
112  public String encryptionAlgorithm;
113
114  @Parameter(2)
115  public String cipherSuite;
116
117  @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}")
118  public static Iterable<Object[]> data() {
119    List<Object[]> params = new ArrayList<>();
120    for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
121      for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
122        for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
123          params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite });
124        }
125      }
126    }
127    return params;
128  }
129
130  private static void setUpKeyProvider(Configuration conf) throws Exception {
131    URI keyProviderUri =
132      new URI("jceks://file" + UTIL.getDataTestDir("test.jks").toUri().toString());
133    conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
134    KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
135    keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
136    keyProvider.flush();
137    keyProvider.close();
138  }
139
140  /**
141   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
142   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
143   */
144  private static MiniKdc setupMiniKdc(File keytabFile) throws Exception {
145    Properties conf = MiniKdc.createConf();
146    conf.put(MiniKdc.DEBUG, true);
147    MiniKdc kdc = null;
148    File dir = null;
149    // There is time lag between selecting a port and trying to bind with it. It's possible that
150    // another service captures the port in between which'll result in BindException.
151    boolean bindException;
152    int numTries = 0;
153    do {
154      try {
155        bindException = false;
156        dir = new File(UTIL.getDataTestDir("kdc").toUri().getPath());
157        kdc = new MiniKdc(conf, dir);
158        kdc.start();
159      } catch (BindException e) {
160        FileUtils.deleteDirectory(dir); // clean directory
161        numTries++;
162        if (numTries == 3) {
163          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
164          throw e;
165        }
166        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
167        bindException = true;
168      }
169    } while (bindException);
170    System.setProperty(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
171      keytabFile.getAbsolutePath());
172    return kdc;
173  }
174
175  @BeforeClass
176  public static void setUpBeforeClass() throws Exception {
177    EVENT_LOOP_GROUP = new NioEventLoopGroup();
178    CHANNEL_CLASS = NioSocketChannel.class;
179    UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
180    KDC = setupMiniKdc(KEYTAB_FILE);
181    USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
182    PRINCIPAL = USERNAME + "/" + HOST;
183    HTTP_PRINCIPAL = "HTTP/" + HOST;
184    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
185
186    setUpKeyProvider(UTIL.getConfiguration());
187    HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(),
188      PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
189    HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
190  }
191
192  @AfterClass
193  public static void tearDownAfterClass() throws IOException, InterruptedException {
194    if (EVENT_LOOP_GROUP != null) {
195      EVENT_LOOP_GROUP.shutdownGracefully().sync();
196    }
197    if (KDC != null) {
198      KDC.stop();
199    }
200    shutdownMiniDFSCluster();
201  }
202
203  private Path testDirOnTestFs;
204
205  private Path entryptionTestDirOnTestFs;
206
207  private void createEncryptionZone() throws Exception {
208    Method method =
209      DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class);
210    method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME);
211  }
212
213  @Before
214  public void setUp() throws Exception {
215    UTIL.getConfiguration().set("dfs.data.transfer.protection", protection);
216    if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) {
217      UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
218    } else {
219      UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
220    }
221    if (StringUtils.isBlank(encryptionAlgorithm)) {
222      UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
223    } else {
224      UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm);
225    }
226    if (StringUtils.isBlank(cipherSuite)) {
227      UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
228    } else {
229      UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
230    }
231
232    startMiniDFSCluster(3);
233    FS = CLUSTER.getFileSystem();
234    testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
235    FS.mkdirs(testDirOnTestFs);
236    entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc");
237    FS.mkdirs(entryptionTestDirOnTestFs);
238    createEncryptionZone();
239  }
240
241  @After
242  public void tearDown() throws IOException {
243    shutdownMiniDFSCluster();
244  }
245
246  private Path getTestFile() {
247    return new Path(testDirOnTestFs, "test");
248  }
249
250  private Path getEncryptionTestFile() {
251    return new Path(entryptionTestDirOnTestFs, "test");
252  }
253
254  private void test(Path file) throws IOException, InterruptedException, ExecutionException {
255    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
256    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
257      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
258    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
259  }
260
261  @Test
262  public void test() throws IOException, InterruptedException, ExecutionException {
263    test(getTestFile());
264    test(getEncryptionTestFile());
265  }
266}