001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; 022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; 024 025import java.io.File; 026import java.io.IOException; 027import java.lang.reflect.Method; 028import java.net.URI; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.List; 032import java.util.concurrent.ExecutionException; 033import org.apache.commons.lang3.StringUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.crypto.CipherSuite; 036import org.apache.hadoop.crypto.key.KeyProvider; 037import org.apache.hadoop.crypto.key.KeyProviderFactory; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.MiscTests; 044import org.apache.hadoop.hdfs.DistributedFileSystem; 045import org.apache.hadoop.minikdc.MiniKdc; 046import org.apache.hadoop.security.UserGroupInformation; 047import org.junit.After; 048import org.junit.AfterClass; 049import org.junit.Before; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056import org.junit.runner.RunWith; 057import org.junit.runners.Parameterized; 058import org.junit.runners.Parameterized.Parameter; 059import org.junit.runners.Parameterized.Parameters; 060 061import org.apache.hbase.thirdparty.io.netty.channel.Channel; 062import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 063import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 064import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 065import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 066 067@RunWith(Parameterized.class) 068@Category({ MiscTests.class, LargeTests.class }) 069public class TestSaslFanOutOneBlockAsyncDFSOutput { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class); 074 075 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 076 077 private static DistributedFileSystem FS; 078 079 private static EventLoopGroup EVENT_LOOP_GROUP; 080 081 private static Class<? extends Channel> CHANNEL_CLASS; 082 083 private static int READ_TIMEOUT_MS = 200000; 084 085 private static final File KEYTAB_FILE = 086 new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); 087 088 private static MiniKdc KDC; 089 090 private static String HOST = "localhost"; 091 092 private static String USERNAME; 093 094 private static String PRINCIPAL; 095 096 private static String HTTP_PRINCIPAL; 097 098 private static String TEST_KEY_NAME = "test_key"; 099 100 @Rule 101 public TestName name = new TestName(); 102 103 @Parameter(0) 104 public String protection; 105 106 @Parameter(1) 107 public String encryptionAlgorithm; 108 109 @Parameter(2) 110 public String cipherSuite; 111 112 @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}") 113 public static Iterable<Object[]> data() { 114 List<Object[]> params = new ArrayList<>(); 115 for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { 116 for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { 117 for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) { 118 params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite }); 119 } 120 } 121 } 122 return params; 123 } 124 125 private static void setUpKeyProvider(Configuration conf) throws Exception { 126 URI keyProviderUri = 127 new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString()); 128 conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString()); 129 KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf); 130 keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf)); 131 keyProvider.flush(); 132 keyProvider.close(); 133 } 134 135 @BeforeClass 136 public static void setUpBeforeClass() throws Exception { 137 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 138 CHANNEL_CLASS = NioSocketChannel.class; 139 TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); 140 KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); 141 USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); 142 PRINCIPAL = USERNAME + "/" + HOST; 143 HTTP_PRINCIPAL = "HTTP/" + HOST; 144 KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL); 145 146 setUpKeyProvider(TEST_UTIL.getConfiguration()); 147 HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration(), 148 PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm()); 149 HBaseKerberosUtils.setSSLConfiguration(TEST_UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class); 150 } 151 152 @AfterClass 153 public static void tearDownAfterClass() throws IOException, InterruptedException { 154 if (EVENT_LOOP_GROUP != null) { 155 EVENT_LOOP_GROUP.shutdownGracefully().sync(); 156 } 157 if (KDC != null) { 158 KDC.stop(); 159 } 160 } 161 162 private Path testDirOnTestFs; 163 164 private Path entryptionTestDirOnTestFs; 165 166 private void createEncryptionZone() throws Exception { 167 Method method = 168 DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class); 169 method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME); 170 } 171 172 @Before 173 public void setUp() throws Exception { 174 TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", protection); 175 if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) { 176 TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false); 177 } else { 178 TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true); 179 } 180 if (StringUtils.isBlank(encryptionAlgorithm)) { 181 TEST_UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); 182 } else { 183 TEST_UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm); 184 } 185 if (StringUtils.isBlank(cipherSuite)) { 186 TEST_UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 187 } else { 188 TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite); 189 } 190 191 TEST_UTIL.startMiniDFSCluster(3); 192 FS = TEST_UTIL.getDFSCluster().getFileSystem(); 193 testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); 194 FS.mkdirs(testDirOnTestFs); 195 entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc"); 196 FS.mkdirs(entryptionTestDirOnTestFs); 197 createEncryptionZone(); 198 } 199 200 @After 201 public void tearDown() throws IOException { 202 TEST_UTIL.shutdownMiniDFSCluster(); 203 } 204 205 private Path getTestFile() { 206 return new Path(testDirOnTestFs, "test"); 207 } 208 209 private Path getEncryptionTestFile() { 210 return new Path(entryptionTestDirOnTestFs, "test"); 211 } 212 213 private void test(Path file) throws IOException, InterruptedException, ExecutionException { 214 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 215 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, 216 true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); 217 TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); 218 } 219 220 @Test 221 public void test() throws IOException, InterruptedException, ExecutionException { 222 test(getTestFile()); 223 test(getEncryptionTestFile()); 224 } 225}