001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
024import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
025import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
026import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
027import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
028import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
029import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
030import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
031import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
032import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
033
034import java.io.File;
035import java.io.IOException;
036import java.lang.reflect.Method;
037import java.net.URI;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.List;
041import java.util.concurrent.ExecutionException;
042import org.apache.commons.lang3.StringUtils;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.crypto.CipherSuite;
045import org.apache.hadoop.crypto.key.KeyProvider;
046import org.apache.hadoop.crypto.key.KeyProviderFactory;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseTestingUtility;
050import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
051import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
052import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken;
053import org.apache.hadoop.hbase.testclassification.LargeTests;
054import org.apache.hadoop.hbase.testclassification.MiscTests;
055import org.apache.hadoop.hdfs.DistributedFileSystem;
056import org.apache.hadoop.http.HttpConfig;
057import org.apache.hadoop.minikdc.MiniKdc;
058import org.apache.hadoop.security.UserGroupInformation;
059import org.junit.After;
060import org.junit.AfterClass;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068import org.junit.runner.RunWith;
069import org.junit.runners.Parameterized;
070import org.junit.runners.Parameterized.Parameter;
071import org.junit.runners.Parameterized.Parameters;
072
073import org.apache.hbase.thirdparty.io.netty.channel.Channel;
074import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
075import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
076import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
077import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
078
079@RunWith(Parameterized.class)
080@Category({ MiscTests.class, LargeTests.class })
081public class TestSaslFanOutOneBlockAsyncDFSOutput {
082
083  @ClassRule
084  public static final HBaseClassTestRule CLASS_RULE =
085      HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class);
086
087  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
088
089  private static DistributedFileSystem FS;
090
091  private static EventLoopGroup EVENT_LOOP_GROUP;
092
093  private static Class<? extends Channel> CHANNEL_CLASS;
094
095  private static int READ_TIMEOUT_MS = 200000;
096
097  private static final File KEYTAB_FILE =
098    new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
099
100  private static MiniKdc KDC;
101
102  private static String HOST = "localhost";
103
104  private static String USERNAME;
105
106  private static String PRINCIPAL;
107
108  private static String HTTP_PRINCIPAL;
109
110  private static String TEST_KEY_NAME = "test_key";
111
112  @Rule
113  public TestName name = new TestName();
114
115  @Parameter(0)
116  public String protection;
117
118  @Parameter(1)
119  public String encryptionAlgorithm;
120
121  @Parameter(2)
122  public String cipherSuite;
123
124  @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}")
125  public static Iterable<Object[]> data() {
126    List<Object[]> params = new ArrayList<>();
127    for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
128      for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
129        for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
130          params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite });
131        }
132      }
133    }
134    return params;
135  }
136
137  private static void setHdfsSecuredConfiguration(Configuration conf) throws Exception {
138    conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm());
139    conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
140    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm());
141    conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath());
142    conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm());
143    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
144    conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
145    conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
146    conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
147
148    File keystoresDir = new File(TEST_UTIL.getDataTestDir("keystore").toUri().getPath());
149    keystoresDir.mkdirs();
150    String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestGenerateDelegationToken.class);
151    KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false);
152
153    conf.setBoolean("ignore.secure.ports.for.testing", true);
154  }
155
156  private static void setUpKeyProvider(Configuration conf) throws Exception {
157    URI keyProviderUri =
158      new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString());
159    conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
160    KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
161    keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
162    keyProvider.flush();
163    keyProvider.close();
164  }
165
166  @BeforeClass
167  public static void setUpBeforeClass() throws Exception {
168    EVENT_LOOP_GROUP = new NioEventLoopGroup();
169    CHANNEL_CLASS = NioSocketChannel.class;
170    TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
171    KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
172    USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
173    PRINCIPAL = USERNAME + "/" + HOST;
174    HTTP_PRINCIPAL = "HTTP/" + HOST;
175    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
176
177    setUpKeyProvider(TEST_UTIL.getConfiguration());
178    setHdfsSecuredConfiguration(TEST_UTIL.getConfiguration());
179    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
180    HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration());
181    UserGroupInformation.setConfiguration(TEST_UTIL.getConfiguration());
182  }
183
184  @AfterClass
185  public static void tearDownAfterClass() throws IOException, InterruptedException {
186    if (EVENT_LOOP_GROUP != null) {
187      EVENT_LOOP_GROUP.shutdownGracefully().sync();
188    }
189    if (KDC != null) {
190      KDC.stop();
191    }
192  }
193
194  private Path testDirOnTestFs;
195
196  private Path entryptionTestDirOnTestFs;
197
198  private void createEncryptionZone() throws Exception {
199    Method method =
200      DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class);
201    method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME);
202  }
203
204  @Before
205  public void setUp() throws Exception {
206    TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", protection);
207    if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) {
208      TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
209    } else {
210      TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
211    }
212    if (StringUtils.isBlank(encryptionAlgorithm)) {
213      TEST_UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
214    } else {
215      TEST_UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm);
216    }
217    if (StringUtils.isBlank(cipherSuite)) {
218      TEST_UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
219    } else {
220      TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
221    }
222
223    TEST_UTIL.startMiniDFSCluster(3);
224    FS = TEST_UTIL.getDFSCluster().getFileSystem();
225    testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
226    FS.mkdirs(testDirOnTestFs);
227    entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc");
228    FS.mkdirs(entryptionTestDirOnTestFs);
229    createEncryptionZone();
230  }
231
232  @After
233  public void tearDown() throws IOException {
234    TEST_UTIL.shutdownMiniDFSCluster();
235  }
236
237  private Path getTestFile() {
238    return new Path(testDirOnTestFs, "test");
239  }
240
241  private Path getEncryptionTestFile() {
242    return new Path(entryptionTestDirOnTestFs, "test");
243  }
244
245  private void test(Path file) throws IOException, InterruptedException, ExecutionException {
246    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
247    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
248      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
249    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
250  }
251
252  @Test
253  public void test() throws IOException, InterruptedException, ExecutionException {
254    test(getTestFile());
255    test(getEncryptionTestFile());
256  }
257}