001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; 022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; 024 025import java.io.File; 026import java.io.IOException; 027import java.net.BindException; 028import java.net.URI; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.List; 032import java.util.Properties; 033import java.util.concurrent.ExecutionException; 034import java.util.stream.Stream; 035import org.apache.commons.io.FileUtils; 036import org.apache.commons.lang3.StringUtils; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.crypto.CipherSuite; 039import org.apache.hadoop.crypto.key.KeyProvider; 040import org.apache.hadoop.crypto.key.KeyProviderFactory; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 043import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 044import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 045import org.apache.hadoop.hbase.security.SecurityConstants; 046import org.apache.hadoop.hbase.testclassification.LargeTests; 047import org.apache.hadoop.hbase.testclassification.MiscTests; 048import org.apache.hadoop.hdfs.DistributedFileSystem; 049import org.apache.hadoop.minikdc.MiniKdc; 050import org.apache.hadoop.security.UserGroupInformation; 051import org.junit.jupiter.api.AfterAll; 052import org.junit.jupiter.api.AfterEach; 053import org.junit.jupiter.api.BeforeAll; 054import org.junit.jupiter.api.BeforeEach; 055import org.junit.jupiter.api.Tag; 056import org.junit.jupiter.api.TestInfo; 057import org.junit.jupiter.api.TestTemplate; 058import org.junit.jupiter.params.provider.Arguments; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.io.netty.channel.Channel; 063import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 064import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 065import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; 066import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; 067 068@Tag(MiscTests.TAG) 069@Tag(LargeTests.TAG) 070@HBaseParameterizedTestTemplate(name = "[{index}] protection = {0}, encryption = {1}, cipher = {2}") 071public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { 072 073 private static final Logger LOG = 074 LoggerFactory.getLogger(TestSaslFanOutOneBlockAsyncDFSOutput.class); 075 076 private static DistributedFileSystem FS; 077 078 private static EventLoopGroup EVENT_LOOP_GROUP; 079 080 private static Class<? extends Channel> CHANNEL_CLASS; 081 082 private static int READ_TIMEOUT_MS = 200000; 083 084 private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath()); 085 086 private static MiniKdc KDC; 087 088 private static String HOST = "localhost"; 089 090 private static String USERNAME; 091 092 private static String PRINCIPAL; 093 094 private static String HTTP_PRINCIPAL; 095 096 private static String TEST_KEY_NAME = "test_key"; 097 098 private static StreamSlowMonitor MONITOR; 099 100 private String protection; 101 private String encryptionAlgorithm; 102 private String cipherSuite; 103 104 public TestSaslFanOutOneBlockAsyncDFSOutput(String protection, String encryptionAlgorithm, 105 String cipherSuite) { 106 this.protection = protection; 107 this.encryptionAlgorithm = encryptionAlgorithm; 108 this.cipherSuite = cipherSuite; 109 } 110 111 public static Stream<Arguments> parameters() { 112 List<Arguments> params = new ArrayList<>(); 113 for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { 114 for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { 115 for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) { 116 params.add(Arguments.of(protection, encryptionAlgorithm, cipherSuite)); 117 } 118 } 119 } 120 return params.stream(); 121 } 122 123 private static void setUpKeyProvider(Configuration conf) throws Exception { 124 URI keyProviderUri = 125 new URI("jceks://file" + UTIL.getDataTestDir("test.jks").toUri().toString()); 126 conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString()); 127 KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf); 128 keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf)); 129 keyProvider.flush(); 130 keyProvider.close(); 131 } 132 133 /** 134 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 135 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. 136 */ 137 private static MiniKdc setupMiniKdc(File keytabFile) throws Exception { 138 Properties conf = MiniKdc.createConf(); 139 conf.put(MiniKdc.DEBUG, true); 140 MiniKdc kdc = null; 141 File dir = null; 142 // There is time lag between selecting a port and trying to bind with it. It's possible that 143 // another service captures the port in between which'll result in BindException. 144 boolean bindException; 145 int numTries = 0; 146 do { 147 try { 148 bindException = false; 149 dir = new File(UTIL.getDataTestDir("kdc").toUri().getPath()); 150 kdc = new MiniKdc(conf, dir); 151 kdc.start(); 152 } catch (BindException e) { 153 FileUtils.deleteDirectory(dir); // clean directory 154 numTries++; 155 if (numTries == 3) { 156 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 157 throw e; 158 } 159 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 160 bindException = true; 161 } 162 } while (bindException); 163 System.setProperty(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 164 keytabFile.getAbsolutePath()); 165 return kdc; 166 } 167 168 @BeforeAll 169 public static void setUpBeforeClass() throws Exception { 170 EVENT_LOOP_GROUP = new NioEventLoopGroup(); 171 CHANNEL_CLASS = NioSocketChannel.class; 172 UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); 173 KDC = setupMiniKdc(KEYTAB_FILE); 174 USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); 175 PRINCIPAL = USERNAME + "/" + HOST; 176 HTTP_PRINCIPAL = "HTTP/" + HOST; 177 KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL); 178 179 setUpKeyProvider(UTIL.getConfiguration()); 180 HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(), 181 PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm()); 182 HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class); 183 MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); 184 } 185 186 @AfterAll 187 public static void tearDownAfterClass() throws Exception { 188 if (EVENT_LOOP_GROUP != null) { 189 EVENT_LOOP_GROUP.shutdownGracefully().get(); 190 } 191 if (KDC != null) { 192 KDC.stop(); 193 } 194 shutdownMiniDFSCluster(); 195 } 196 197 private Path testDirOnTestFs; 198 199 private Path entryptionTestDirOnTestFs; 200 201 private void createEncryptionZone() throws Exception { 202 FS.createEncryptionZone(entryptionTestDirOnTestFs, TEST_KEY_NAME); 203 } 204 205 @BeforeEach 206 public void setUp(TestInfo testInfo) throws Exception { 207 UTIL.getConfiguration().set("dfs.data.transfer.protection", protection); 208 if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) { 209 UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false); 210 } else { 211 UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true); 212 } 213 if (StringUtils.isBlank(encryptionAlgorithm)) { 214 UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); 215 } else { 216 UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm); 217 } 218 if (StringUtils.isBlank(cipherSuite)) { 219 UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 220 } else { 221 UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite); 222 } 223 224 startMiniDFSCluster(3); 225 FS = CLUSTER.getFileSystem(); 226 testDirOnTestFs = new Path("/" + testInfo.getDisplayName().replaceAll("[^0-9a-zA-Z]", "_")); 227 FS.mkdirs(testDirOnTestFs); 228 entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc"); 229 FS.mkdirs(entryptionTestDirOnTestFs); 230 createEncryptionZone(); 231 } 232 233 @AfterEach 234 public void tearDown() throws IOException { 235 shutdownMiniDFSCluster(); 236 } 237 238 private Path getTestFile() { 239 return new Path(testDirOnTestFs, "test"); 240 } 241 242 private Path getEncryptionTestFile() { 243 return new Path(entryptionTestDirOnTestFs, "test"); 244 } 245 246 private void test(Path file) throws IOException, InterruptedException, ExecutionException { 247 EventLoop eventLoop = EVENT_LOOP_GROUP.next(); 248 FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, 249 true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); 250 TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); 251 } 252 253 @TestTemplate 254 public void test() throws Exception { 255 test(getTestFile()); 256 test(getEncryptionTestFile()); 257 } 258}