001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
024
025import java.io.File;
026import java.io.IOException;
027import java.lang.reflect.Method;
028import java.net.URI;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.List;
032import java.util.concurrent.ExecutionException;
033import org.apache.commons.lang3.StringUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.crypto.CipherSuite;
036import org.apache.hadoop.crypto.key.KeyProvider;
037import org.apache.hadoop.crypto.key.KeyProviderFactory;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.MiscTests;
044import org.apache.hadoop.hdfs.DistributedFileSystem;
045import org.apache.hadoop.minikdc.MiniKdc;
046import org.apache.hadoop.security.UserGroupInformation;
047import org.junit.After;
048import org.junit.AfterClass;
049import org.junit.Before;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.rules.TestName;
056import org.junit.runner.RunWith;
057import org.junit.runners.Parameterized;
058import org.junit.runners.Parameterized.Parameter;
059import org.junit.runners.Parameterized.Parameters;
060
061import org.apache.hbase.thirdparty.io.netty.channel.Channel;
062import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
063import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
064import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
065import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
066
067@RunWith(Parameterized.class)
068@Category({ MiscTests.class, LargeTests.class })
069public class TestSaslFanOutOneBlockAsyncDFSOutput {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073      HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class);
074
075  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
076
077  private static DistributedFileSystem FS;
078
079  private static EventLoopGroup EVENT_LOOP_GROUP;
080
081  private static Class<? extends Channel> CHANNEL_CLASS;
082
083  private static int READ_TIMEOUT_MS = 200000;
084
085  private static final File KEYTAB_FILE =
086    new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
087
088  private static MiniKdc KDC;
089
090  private static String HOST = "localhost";
091
092  private static String USERNAME;
093
094  private static String PRINCIPAL;
095
096  private static String HTTP_PRINCIPAL;
097
098  private static String TEST_KEY_NAME = "test_key";
099
100  @Rule
101  public TestName name = new TestName();
102
103  @Parameter(0)
104  public String protection;
105
106  @Parameter(1)
107  public String encryptionAlgorithm;
108
109  @Parameter(2)
110  public String cipherSuite;
111
112  @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}")
113  public static Iterable<Object[]> data() {
114    List<Object[]> params = new ArrayList<>();
115    for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
116      for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
117        for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
118          params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite });
119        }
120      }
121    }
122    return params;
123  }
124
125  private static void setUpKeyProvider(Configuration conf) throws Exception {
126    URI keyProviderUri =
127      new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString());
128    conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
129    KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
130    keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
131    keyProvider.flush();
132    keyProvider.close();
133  }
134
135  @BeforeClass
136  public static void setUpBeforeClass() throws Exception {
137    EVENT_LOOP_GROUP = new NioEventLoopGroup();
138    CHANNEL_CLASS = NioSocketChannel.class;
139    TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
140    KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
141    USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
142    PRINCIPAL = USERNAME + "/" + HOST;
143    HTTP_PRINCIPAL = "HTTP/" + HOST;
144    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
145
146    setUpKeyProvider(TEST_UTIL.getConfiguration());
147    HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration(),
148        PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
149    HBaseKerberosUtils.setSSLConfiguration(TEST_UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
150  }
151
152  @AfterClass
153  public static void tearDownAfterClass() throws IOException, InterruptedException {
154    if (EVENT_LOOP_GROUP != null) {
155      EVENT_LOOP_GROUP.shutdownGracefully().sync();
156    }
157    if (KDC != null) {
158      KDC.stop();
159    }
160  }
161
162  private Path testDirOnTestFs;
163
164  private Path entryptionTestDirOnTestFs;
165
166  private void createEncryptionZone() throws Exception {
167    Method method =
168      DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class);
169    method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME);
170  }
171
172  @Before
173  public void setUp() throws Exception {
174    TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", protection);
175    if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) {
176      TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
177    } else {
178      TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
179    }
180    if (StringUtils.isBlank(encryptionAlgorithm)) {
181      TEST_UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
182    } else {
183      TEST_UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm);
184    }
185    if (StringUtils.isBlank(cipherSuite)) {
186      TEST_UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
187    } else {
188      TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
189    }
190
191    TEST_UTIL.startMiniDFSCluster(3);
192    FS = TEST_UTIL.getDFSCluster().getFileSystem();
193    testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
194    FS.mkdirs(testDirOnTestFs);
195    entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc");
196    FS.mkdirs(entryptionTestDirOnTestFs);
197    createEncryptionZone();
198  }
199
200  @After
201  public void tearDown() throws IOException {
202    TEST_UTIL.shutdownMiniDFSCluster();
203  }
204
205  private Path getTestFile() {
206    return new Path(testDirOnTestFs, "test");
207  }
208
209  private Path getEncryptionTestFile() {
210    return new Path(entryptionTestDirOnTestFs, "test");
211  }
212
213  private void test(Path file) throws IOException, InterruptedException, ExecutionException {
214    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
215    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
216      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
217    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
218  }
219
220  @Test
221  public void test() throws IOException, InterruptedException, ExecutionException {
222    test(getTestFile());
223    test(getEncryptionTestFile());
224  }
225}