001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY; 021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; 023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; 024import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; 025import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; 026import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 027import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; 028import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY; 029import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; 030import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; 031import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; 032import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; 033 034import java.io.File; 035import java.io.IOException; 036import java.lang.reflect.Method; 037import java.net.URI; 038import java.util.ArrayList; 039import java.util.Arrays; 040import java.util.List; 041import java.util.concurrent.ExecutionException; 042import org.apache.commons.lang3.StringUtils; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.crypto.CipherSuite; 045import org.apache.hadoop.crypto.key.KeyProvider; 046import org.apache.hadoop.crypto.key.KeyProviderFactory; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.HBaseClassTestRule; 049import org.apache.hadoop.hbase.HBaseTestingUtility; 050import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil; 051import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 052import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken; 053import org.apache.hadoop.hbase.testclassification.LargeTests; 054import org.apache.hadoop.hbase.testclassification.MiscTests; 055import org.apache.hadoop.hdfs.DistributedFileSystem; 056import org.apache.hadoop.http.HttpConfig; 057import org.apache.hadoop.minikdc.MiniKdc; 058import org.apache.hadoop.security.UserGroupInformation; 059import org.junit.After; 060import org.junit.AfterClass; 061import org.junit.Before; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.junit.rules.TestName; 068import org.junit.runner.RunWith; 069import org.junit.runners.Parameterized; 070import org.junit.runners.Parameterized.Parameter; 071import org.junit.runners.Parameterized.Parameters; 072 073import org.apache.hbase.thirdparty.io.netty.channel.Channel; 074import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 075import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 076import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 077import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 078 079@RunWith(Parameterized.class) 080@Category({ MiscTests.class, LargeTests.class }) 081public class TestSaslFanOutOneBlockAsyncDFSOutput { 082 083 @ClassRule 084 public static final HBaseClassTestRule CLASS_RULE = 085 HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class); 086 087 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 088 089 private static DistributedFileSystem FS; 090 091 private static EventLoopGroup EVENT_LOOP_GROUP; 092 093 private static Class<? extends Channel> CHANNEL_CLASS; 094 095 private static int READ_TIMEOUT_MS = 200000; 096 097 private static final File KEYTAB_FILE = 098 new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); 099 100 private static MiniKdc KDC; 101 102 private static String HOST = "localhost"; 103 104 private static String USERNAME; 105 106 private static String PRINCIPAL; 107 108 private static String HTTP_PRINCIPAL; 109 110 private static String TEST_KEY_NAME = "test_key"; 111 112 @Rule 113 public TestName name = new TestName(); 114 115 @Parameter(0) 116 public String protection; 117 118 @Parameter(1) 119 public String encryptionAlgorithm; 120 121 @Parameter(2) 122 public String cipherSuite; 123 124 @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}") 125 public static Iterable<Object[]> data() { 126 List<Object[]> params = new ArrayList<>(); 127 for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { 128 for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { 129 for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) { 130 params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite }); 131 } 132 } 133 } 134 return params; 135 } 136 137 private static void setHdfsSecuredConfiguration(Configuration conf) throws Exception { 138 conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm()); 139 conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); 140 conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, PRINCIPAL + "@" + KDC.getRealm()); 141 conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); 142 conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, HTTP_PRINCIPAL + "@" + KDC.getRealm()); 143 conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); 144 conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); 145 conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); 146 conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0"); 147 148 File keystoresDir = new File(TEST_UTIL.getDataTestDir("keystore").toUri().getPath()); 149 keystoresDir.mkdirs(); 150 String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestGenerateDelegationToken.class); 151 KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false); 152 153 conf.setBoolean("ignore.secure.ports.for.testing", true); 154 } 155 156 private static void setUpKeyProvider(Configuration conf) throws Exception { 157 URI keyProviderUri = 158 new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString()); 159 conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString()); 160 KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf); 161 keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf)); 162 keyProvider.flush(); 163 keyProvider.close(); 164 } 165 166 @BeforeClass 167 public static void setUpBeforeClass() throws Exception { 168 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 169 CHANNEL_CLASS = NioSocketChannel.class; 170 TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); 171 KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); 172 USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); 173 PRINCIPAL = USERNAME + "/" + HOST; 174 HTTP_PRINCIPAL = "HTTP/" + HOST; 175 KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL); 176 177 setUpKeyProvider(TEST_UTIL.getConfiguration()); 178 setHdfsSecuredConfiguration(TEST_UTIL.getConfiguration()); 179 HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); 180 HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration()); 181 UserGroupInformation.setConfiguration(TEST_UTIL.getConfiguration()); 182 } 183 184 @AfterClass 185 public static void tearDownAfterClass() throws IOException, InterruptedException { 186 if (EVENT_LOOP_GROUP != null) { 187 EVENT_LOOP_GROUP.shutdownGracefully().sync(); 188 } 189 if (KDC != null) { 190 KDC.stop(); 191 } 192 } 193 194 private Path testDirOnTestFs; 195 196 private Path entryptionTestDirOnTestFs; 197 198 private void createEncryptionZone() throws Exception { 199 Method method = 200 DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class); 201 method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME); 202 } 203 204 @Before 205 public void setUp() throws Exception { 206 TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", protection); 207 if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) { 208 TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false); 209 } else { 210 TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true); 211 } 212 if (StringUtils.isBlank(encryptionAlgorithm)) { 213 TEST_UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); 214 } else { 215 TEST_UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm); 216 } 217 if (StringUtils.isBlank(cipherSuite)) { 218 TEST_UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 219 } else { 220 TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite); 221 } 222 223 TEST_UTIL.startMiniDFSCluster(3); 224 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 225 testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); 226 FS.mkdirs(testDirOnTestFs); 227 entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc"); 228 FS.mkdirs(entryptionTestDirOnTestFs); 229 createEncryptionZone(); 230 } 231 232 @After 233 public void tearDown() throws IOException { 234 TEST_UTIL.shutdownMiniDFSCluster(); 235 } 236 237 private Path getTestFile() { 238 return new Path(testDirOnTestFs, "test"); 239 } 240 241 private Path getEncryptionTestFile() { 242 return new Path(entryptionTestDirOnTestFs, "test"); 243 } 244 245 private void test(Path file) throws IOException, InterruptedException, ExecutionException { 246 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 247 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, 248 true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); 249 TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); 250 } 251 252 @Test 253 public void test() throws IOException, InterruptedException, ExecutionException { 254 test(getTestFile()); 255 test(getEncryptionTestFile()); 256 } 257}