001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
021import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
022import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
023import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
024
025import java.io.File;
026import java.io.IOException;
027import java.net.BindException;
028import java.net.URI;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.List;
032import java.util.Properties;
033import java.util.concurrent.ExecutionException;
034import java.util.stream.Stream;
035import org.apache.commons.io.FileUtils;
036import org.apache.commons.lang3.StringUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.crypto.CipherSuite;
039import org.apache.hadoop.crypto.key.KeyProvider;
040import org.apache.hadoop.crypto.key.KeyProviderFactory;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
043import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
044import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
045import org.apache.hadoop.hbase.security.SecurityConstants;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.MiscTests;
048import org.apache.hadoop.hdfs.DistributedFileSystem;
049import org.apache.hadoop.minikdc.MiniKdc;
050import org.apache.hadoop.security.UserGroupInformation;
051import org.junit.jupiter.api.AfterAll;
052import org.junit.jupiter.api.AfterEach;
053import org.junit.jupiter.api.BeforeAll;
054import org.junit.jupiter.api.BeforeEach;
055import org.junit.jupiter.api.Tag;
056import org.junit.jupiter.api.TestInfo;
057import org.junit.jupiter.api.TestTemplate;
058import org.junit.jupiter.params.provider.Arguments;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.io.netty.channel.Channel;
063import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
064import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
065import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
066import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
067
068@Tag(MiscTests.TAG)
069@Tag(LargeTests.TAG)
070@HBaseParameterizedTestTemplate(name = "[{index}] protection = {0}, encryption = {1}, cipher = {2}")
071public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
072
073  private static final Logger LOG =
074    LoggerFactory.getLogger(TestSaslFanOutOneBlockAsyncDFSOutput.class);
075
076  private static DistributedFileSystem FS;
077
078  private static EventLoopGroup EVENT_LOOP_GROUP;
079
080  private static Class<? extends Channel> CHANNEL_CLASS;
081
082  private static int READ_TIMEOUT_MS = 200000;
083
084  private static final File KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
085
086  private static MiniKdc KDC;
087
088  private static String HOST = "localhost";
089
090  private static String USERNAME;
091
092  private static String PRINCIPAL;
093
094  private static String HTTP_PRINCIPAL;
095
096  private static String TEST_KEY_NAME = "test_key";
097
098  private static StreamSlowMonitor MONITOR;
099
100  private String protection;
101  private String encryptionAlgorithm;
102  private String cipherSuite;
103
104  public TestSaslFanOutOneBlockAsyncDFSOutput(String protection, String encryptionAlgorithm,
105    String cipherSuite) {
106    this.protection = protection;
107    this.encryptionAlgorithm = encryptionAlgorithm;
108    this.cipherSuite = cipherSuite;
109  }
110
111  public static Stream<Arguments> parameters() {
112    List<Arguments> params = new ArrayList<>();
113    for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
114      for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
115        for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
116          params.add(Arguments.of(protection, encryptionAlgorithm, cipherSuite));
117        }
118      }
119    }
120    return params.stream();
121  }
122
123  private static void setUpKeyProvider(Configuration conf) throws Exception {
124    URI keyProviderUri =
125      new URI("jceks://file" + UTIL.getDataTestDir("test.jks").toUri().toString());
126    conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
127    KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
128    keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
129    keyProvider.flush();
130    keyProvider.close();
131  }
132
133  /**
134   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
135   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
136   */
137  private static MiniKdc setupMiniKdc(File keytabFile) throws Exception {
138    Properties conf = MiniKdc.createConf();
139    conf.put(MiniKdc.DEBUG, true);
140    MiniKdc kdc = null;
141    File dir = null;
142    // There is time lag between selecting a port and trying to bind with it. It's possible that
143    // another service captures the port in between which'll result in BindException.
144    boolean bindException;
145    int numTries = 0;
146    do {
147      try {
148        bindException = false;
149        dir = new File(UTIL.getDataTestDir("kdc").toUri().getPath());
150        kdc = new MiniKdc(conf, dir);
151        kdc.start();
152      } catch (BindException e) {
153        FileUtils.deleteDirectory(dir); // clean directory
154        numTries++;
155        if (numTries == 3) {
156          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
157          throw e;
158        }
159        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
160        bindException = true;
161      }
162    } while (bindException);
163    System.setProperty(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
164      keytabFile.getAbsolutePath());
165    return kdc;
166  }
167
168  @BeforeAll
169  public static void setUpBeforeClass() throws Exception {
170    EVENT_LOOP_GROUP = new NioEventLoopGroup();
171    CHANNEL_CLASS = NioSocketChannel.class;
172    UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
173    KDC = setupMiniKdc(KEYTAB_FILE);
174    USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
175    PRINCIPAL = USERNAME + "/" + HOST;
176    HTTP_PRINCIPAL = "HTTP/" + HOST;
177    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
178
179    setUpKeyProvider(UTIL.getConfiguration());
180    HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(),
181      PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
182    HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
183    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
184  }
185
186  @AfterAll
187  public static void tearDownAfterClass() throws Exception {
188    if (EVENT_LOOP_GROUP != null) {
189      EVENT_LOOP_GROUP.shutdownGracefully().get();
190    }
191    if (KDC != null) {
192      KDC.stop();
193    }
194    shutdownMiniDFSCluster();
195  }
196
197  private Path testDirOnTestFs;
198
199  private Path entryptionTestDirOnTestFs;
200
201  private void createEncryptionZone() throws Exception {
202    FS.createEncryptionZone(entryptionTestDirOnTestFs, TEST_KEY_NAME);
203  }
204
205  @BeforeEach
206  public void setUp(TestInfo testInfo) throws Exception {
207    UTIL.getConfiguration().set("dfs.data.transfer.protection", protection);
208    if (StringUtils.isBlank(encryptionAlgorithm) && StringUtils.isBlank(cipherSuite)) {
209      UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false);
210    } else {
211      UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true);
212    }
213    if (StringUtils.isBlank(encryptionAlgorithm)) {
214      UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
215    } else {
216      UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm);
217    }
218    if (StringUtils.isBlank(cipherSuite)) {
219      UTIL.getConfiguration().unset(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
220    } else {
221      UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
222    }
223
224    startMiniDFSCluster(3);
225    FS = CLUSTER.getFileSystem();
226    testDirOnTestFs = new Path("/" + testInfo.getDisplayName().replaceAll("[^0-9a-zA-Z]", "_"));
227    FS.mkdirs(testDirOnTestFs);
228    entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc");
229    FS.mkdirs(entryptionTestDirOnTestFs);
230    createEncryptionZone();
231  }
232
233  @AfterEach
234  public void tearDown() throws IOException {
235    shutdownMiniDFSCluster();
236  }
237
238  private Path getTestFile() {
239    return new Path(testDirOnTestFs, "test");
240  }
241
242  private Path getEncryptionTestFile() {
243    return new Path(entryptionTestDirOnTestFs, "test");
244  }
245
246  private void test(Path file) throws IOException, InterruptedException, ExecutionException {
247    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
248    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
249      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
250    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
251  }
252
253  @TestTemplate
254  public void test() throws Exception {
255    test(getTestFile());
256    test(getEncryptionTestFile());
257  }
258}