001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.Closeable;
025import java.io.File;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import org.apache.commons.io.IOUtils;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.RegionLocator;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.mapreduce.Job;
040import org.apache.hadoop.minikdc.MiniKdc;
041import org.apache.hadoop.security.UserGroupInformation;
042import org.apache.hadoop.security.token.Token;
043import org.apache.hadoop.security.token.TokenIdentifier;
044import org.junit.jupiter.api.AfterEach;
045import org.junit.jupiter.api.BeforeEach;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048
049/**
050 * Tests for {@link HFileOutputFormat2} with secure mode.
051 */
052@Tag(VerySlowMapReduceTests.TAG)
053@Tag(LargeTests.TAG)
054public class TestHFileOutputFormat2WithSecurity extends HFileOutputFormat2TestBase {
055
056  private static final byte[] FAMILIES = Bytes.toBytes("test_cf");
057
058  private static final String HTTP_PRINCIPAL = "HTTP/localhost";
059
060  private HBaseTestingUtil utilA;
061
062  private Configuration confA;
063
064  private HBaseTestingUtil utilB;
065
066  private MiniKdc kdc;
067
068  private List<Closeable> clusters = new ArrayList<>();
069
070  @BeforeEach
071  public void setupSecurityClusters() throws Exception {
072    utilA = new HBaseTestingUtil();
073    confA = utilA.getConfiguration();
074
075    utilB = new HBaseTestingUtil();
076
077    // Prepare security configs.
078    File keytab = new File(utilA.getDataTestDir("keytab").toUri().getPath());
079    kdc = utilA.setupMiniKdc(keytab);
080    String username = UserGroupInformation.getLoginUser().getShortUserName();
081    String userPrincipal = username + "/localhost";
082    kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
083    loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
084
085    // Start security clusterA
086    clusters.add(utilA.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL));
087
088    // Start security clusterB
089    clusters.add(utilB.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL));
090  }
091
092  @AfterEach
093  public void teardownSecurityClusters() {
094    IOUtils.closeQuietly(clusters);
095    clusters.clear();
096    if (kdc != null) {
097      kdc.stop();
098    }
099  }
100
101  @Test
102  public void testIncrementalLoadInMultiClusterWithSecurity() throws Exception {
103    TableName tableName = TableName.valueOf("testIncrementalLoadInMultiClusterWithSecurity");
104
105    // Create table in clusterB
106    try (Table table = utilB.createTable(tableName, FAMILIES);
107      RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
108
109      // Create job in clusterA
110      Job job = Job.getInstance(confA, "testIncrementalLoadInMultiClusterWithSecurity");
111      job.setWorkingDirectory(
112        utilA.getDataTestDirOnTestFS("testIncrementalLoadInMultiClusterWithSecurity"));
113      setupRandomGeneratorMapper(job, false);
114      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
115
116      Map<Text, Token<? extends TokenIdentifier>> tokenMap = job.getCredentials().getTokenMap();
117      assertEquals(2, tokenMap.size());
118
119      String remoteClusterId = utilB.getHBaseClusterInterface().getClusterMetrics().getClusterId();
120      assertTrue(tokenMap.containsKey(new Text(remoteClusterId)));
121    } finally {
122      if (utilB.getAdmin().tableExists(tableName)) {
123        utilB.deleteTable(tableName);
124      }
125    }
126  }
127}