001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023
024import java.io.Closeable;
025import java.io.File;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import org.apache.commons.io.IOUtils;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.RegionLocator;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.io.Text;
040import org.apache.hadoop.mapreduce.Job;
041import org.apache.hadoop.minikdc.MiniKdc;
042import org.apache.hadoop.security.UserGroupInformation;
043import org.apache.hadoop.security.token.Token;
044import org.apache.hadoop.security.token.TokenIdentifier;
045import org.junit.After;
046import org.junit.Before;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051/**
052 * Tests for {@link HFileOutputFormat2} with secure mode.
053 */
054@Category({ VerySlowMapReduceTests.class, LargeTests.class })
055public class TestHFileOutputFormat2WithSecurity extends HFileOutputFormat2TestBase {
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestHFileOutputFormat2WithSecurity.class);
059
060  private static final byte[] FAMILIES = Bytes.toBytes("test_cf");
061
062  private static final String HTTP_PRINCIPAL = "HTTP/localhost";
063
064  private HBaseTestingUtil utilA;
065
066  private Configuration confA;
067
068  private HBaseTestingUtil utilB;
069
070  private MiniKdc kdc;
071
072  private List<Closeable> clusters = new ArrayList<>();
073
074  @Before
075  public void setupSecurityClusters() throws Exception {
076    utilA = new HBaseTestingUtil();
077    confA = utilA.getConfiguration();
078
079    utilB = new HBaseTestingUtil();
080
081    // Prepare security configs.
082    File keytab = new File(utilA.getDataTestDir("keytab").toUri().getPath());
083    kdc = utilA.setupMiniKdc(keytab);
084    String username = UserGroupInformation.getLoginUser().getShortUserName();
085    String userPrincipal = username + "/localhost";
086    kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
087    loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
088
089    // Start security clusterA
090    clusters.add(utilA.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL));
091
092    // Start security clusterB
093    clusters.add(utilB.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL));
094  }
095
096  @After
097  public void teardownSecurityClusters() {
098    IOUtils.closeQuietly(clusters);
099    clusters.clear();
100    if (kdc != null) {
101      kdc.stop();
102    }
103  }
104
105  @Test
106  public void testIncrementalLoadInMultiClusterWithSecurity() throws Exception {
107    TableName tableName = TableName.valueOf("testIncrementalLoadInMultiClusterWithSecurity");
108
109    // Create table in clusterB
110    try (Table table = utilB.createTable(tableName, FAMILIES);
111      RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
112
113      // Create job in clusterA
114      Job job = Job.getInstance(confA, "testIncrementalLoadInMultiClusterWithSecurity");
115      job.setWorkingDirectory(
116        utilA.getDataTestDirOnTestFS("testIncrementalLoadInMultiClusterWithSecurity"));
117      setupRandomGeneratorMapper(job, false);
118      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
119
120      Map<Text, Token<? extends TokenIdentifier>> tokenMap = job.getCredentials().getTokenMap();
121      assertEquals(2, tokenMap.size());
122
123      String remoteClusterId = utilB.getHBaseClusterInterface().getClusterMetrics().getClusterId();
124      assertTrue(tokenMap.containsKey(new Text(remoteClusterId)));
125    } finally {
126      if (utilB.getAdmin().tableExists(tableName)) {
127        utilB.deleteTable(tableName);
128      }
129    }
130  }
131}