001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; 022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; 024 025import java.io.File; 026import java.io.IOException; 027import java.lang.reflect.Method; 028import java.net.BindException; 029import java.net.URI; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.Properties; 034import java.util.concurrent.ExecutionException; 035import org.apache.commons.io.FileUtils; 036import org.apache.commons.lang3.StringUtils; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.crypto.CipherSuite; 039import org.apache.hadoop.crypto.key.KeyProvider; 040import org.apache.hadoop.crypto.key.KeyProviderFactory; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 044import org.apache.hadoop.hbase.security.SecurityConstants; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.apache.hadoop.hbase.testclassification.MiscTests; 047import org.apache.hadoop.hdfs.DistributedFileSystem; 048import org.apache.hadoop.minikdc.MiniKdc; 049import org.apache.hadoop.security.UserGroupInformation; 050import org.junit.After; 051import org.junit.AfterClass; 052import org.junit.Before; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.junit.runner.RunWith; 060import org.junit.runners.Parameterized; 061import org.junit.runners.Parameterized.Parameter; 062import org.junit.runners.Parameterized.Parameters; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.io.netty.channel.Channel; 067import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 068import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 069import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 070import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 071 072@RunWith(Parameterized.class) 073@Category({ MiscTests.class, LargeTests.class }) 074public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { 075 076 private static final Logger LOG = 077 LoggerFactory.getLogger(TestSaslFanOutOneBlockAsyncDFSOutput.class); 078 079 @ClassRule 080 public static final HBaseClassTestRule CLASS_RULE = 081 HBaseClassTestRule.forClass(TestSaslFanOutOneBlockAsyncDFSOutput.class); 082 083 private static DistributedFileSystem FS; 084 085 private static EventLoopGroup EVENT_LOOP_GROUP; 086 087 private static Class<? extends Channel> CHANNEL_CLASS; 088 089 private static int READ_TIMEOUT_MS = 200000; 090 091 private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath()); 092 093 private static MiniKdc KDC; 094 095 private static String HOST = "localhost"; 096 097 private static String USERNAME; 098 099 private static String PRINCIPAL; 100 101 private static String HTTP_PRINCIPAL; 102 103 private static String TEST_KEY_NAME = "test_key"; 104 105 @Rule 106 public TestName name = new TestName(); 107 108 @Parameter(0) 109 public String protection; 110 111 @Parameter(1) 112 public String encryptionAlgorithm; 113 114 @Parameter(2) 115 public String cipherSuite; 116 117 @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}") 118 public static Iterable<Object[]> data() { 119 List<Object[]> params = new ArrayList<>(); 120 for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { 121 for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { 122 for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) { 123 params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite }); 124 } 125 } 126 } 127 return params; 128 } 129 130 private static void setUpKeyProvider(Configuration conf) throws Exception { 131 URI keyProviderUri = 132 new URI("jceks://file" + UTIL.getDataTestDir("test.jks").toUri().toString()); 133 conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString()); 134 KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf); 135 keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf)); 136 keyProvider.flush(); 137 keyProvider.close(); 138 } 139 140 /** 141 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 142 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. 143 */ 144 private static MiniKdc setupMiniKdc(File keytabFile) throws Exception { 145 Properties conf = MiniKdc.createConf(); 146 conf.put(MiniKdc.DEBUG, true); 147 MiniKdc kdc = null; 148 File dir = null; 149 // There is time lag between selecting a port and trying to bind with it. It's possible that 150 // another service captures the port in between which'll result in BindException. 151 boolean bindException; 152 int numTries = 0; 153 do { 154 try { 155 bindException = false; 156 dir = new File(UTIL.getDataTestDir("kdc").toUri().getPath()); 157 kdc = new MiniKdc(conf, dir); 158 kdc.start(); 159 } catch (BindException e) { 160 FileUtils.deleteDirectory(dir); // clean directory 161 numTries++; 162 if (numTries == 3) { 163 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 164 throw e; 165 } 166 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 167 bindException = true; 168 } 169 } while (bindException); 170 System.setProperty(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 171 keytabFile.getAbsolutePath()); 172 return kdc; 173 } 174 175 @BeforeClass 176 public static void setUpBeforeClass() throws Exception { 177 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 178 CHANNEL_CLASS = NioSocketChannel.class; 179 UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); 180 KDC = setupMiniKdc(KEYTAB_FILE); 181 USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); 182 PRINCIPAL = USERNAME + "/" + HOST; 183 HTTP_PRINCIPAL = "HTTP/" + HOST; 184 KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL); 185 186 setUpKeyProvider(UTIL.getConfiguration()); 187 HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(), 188 PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm()); 189 HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class); 190 } 191 192 @AfterClass 193 public static void tearDownAfterClass() throws IOException, InterruptedException { 194 if (EVENT_LOOP_GROUP != null) { 195 EVENT_LOOP_GROUP.shutdownGracefully().sync(); 196 } 197 if (KDC != null) { 198 KDC.stop(); 199 } 200 shutdownMiniDFSCluster(); 201 } 202 203 private Path testDirOnTestFs; 204 205 private Path entryptionTestDirOnTestFs; 206 207 private void createEncryptionZone() throws Exception { 208 Method method = 209 DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class); 210 method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME); 211 } 212 213 @Before 214 public void setUp() throws Exception { 215 UTIL.getConfiguration().set("dfs.data.transfer.protection", protection); 216 if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) { 217 UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false); 218 } else { 219 UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true); 220 } 221 if (StringUtils.isBlank(encryptionAlgorithm)) { 222 UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); 223 } else { 224 UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm); 225 } 226 if (StringUtils.isBlank(cipherSuite)) { 227 UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 228 } else { 229 UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite); 230 } 231 232 startMiniDFSCluster(3); 233 FS = CLUSTER.getFileSystem(); 234 testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); 235 FS.mkdirs(testDirOnTestFs); 236 entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc"); 237 FS.mkdirs(entryptionTestDirOnTestFs); 238 createEncryptionZone(); 239 } 240 241 @After 242 public void tearDown() throws IOException { 243 shutdownMiniDFSCluster(); 244 } 245 246 private Path getTestFile() { 247 return new Path(testDirOnTestFs, "test"); 248 } 249 250 private Path getEncryptionTestFile() { 251 return new Path(entryptionTestDirOnTestFs, "test"); 252 } 253 254 private void test(Path file) throws IOException, InterruptedException, ExecutionException { 255 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 256 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, 257 true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); 258 TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); 259 } 260 261 @Test 262 public void test() throws IOException, InterruptedException, ExecutionException { 263 test(getTestFile()); 264 test(getEncryptionTestFile()); 265 } 266}