001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.File;
023import java.io.IOException;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.function.Supplier;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
032import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
033import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
034import org.apache.hadoop.hbase.security.access.AccessController;
035import org.apache.hadoop.hbase.security.access.SecureTestUtil;
036import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
037import org.apache.hadoop.hbase.security.token.TokenProvider;
038import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.testclassification.ReplicationTests;
041import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
042import org.apache.hadoop.hbase.zookeeper.ZKConfig;
043import org.apache.hadoop.io.Text;
044import org.apache.hadoop.mapreduce.Job;
045import org.apache.hadoop.minikdc.MiniKdc;
046import org.apache.hadoop.security.Credentials;
047import org.apache.hadoop.security.UserGroupInformation;
048import org.apache.hadoop.security.token.Token;
049import org.apache.hadoop.security.token.TokenIdentifier;
050import org.junit.AfterClass;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.runner.RunWith;
056import org.junit.runners.Parameterized;
057import org.junit.runners.Parameterized.Parameter;
058import org.junit.runners.Parameterized.Parameters;
059
060@Category({ ReplicationTests.class, LargeTests.class })
061@RunWith(Parameterized.class)
062public class TestVerifyReplicationSecureClusterCredentials {
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestVerifyReplicationSecureClusterCredentials.class);
066
067  private static MiniKdc KDC;
068  private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
069  private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
070
071  private static final File KEYTAB_FILE =
072    new File(UTIL1.getDataTestDir("keytab").toUri().getPath());
073
074  private static final String LOCALHOST = "localhost";
075  private static String CLUSTER_PRINCIPAL;
076  private static String FULL_USER_PRINCIPAL;
077  private static String HTTP_PRINCIPAL;
078
079  private static void setUpKdcServer() throws Exception {
080    KDC = UTIL1.setupMiniKdc(KEYTAB_FILE);
081    String username = UserGroupInformation.getLoginUser().getShortUserName();
082    String userPrincipal = username + '/' + LOCALHOST;
083    CLUSTER_PRINCIPAL = userPrincipal;
084    FULL_USER_PRINCIPAL = userPrincipal + '@' + KDC.getRealm();
085    HTTP_PRINCIPAL = "HTTP/" + LOCALHOST;
086    KDC.createPrincipal(KEYTAB_FILE, CLUSTER_PRINCIPAL, HTTP_PRINCIPAL);
087  }
088
089  private static void setupCluster(HBaseTestingUtility util) throws Exception {
090    Configuration conf = util.getConfiguration();
091
092    SecureTestUtil.enableSecurity(conf);
093    VisibilityTestUtil.enableVisiblityLabels(conf);
094    SecureTestUtil.verifyConfiguration(conf);
095
096    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
097      AccessController.class.getName() + ',' + TokenProvider.class.getName());
098
099    HBaseKerberosUtils.setSecuredConfiguration(conf, CLUSTER_PRINCIPAL + '@' + KDC.getRealm(),
100      HTTP_PRINCIPAL + '@' + KDC.getRealm());
101
102    util.startMiniCluster();
103  }
104
105  /**
106   * Sets the security firstly for getting the correct default realm.
107   */
108  @BeforeClass
109  public static void beforeClass() throws Exception {
110    setUpKdcServer();
111    setupCluster(UTIL1);
112    setupCluster(UTIL2);
113
114    try (Admin admin = UTIL1.getAdmin()) {
115      admin.addReplicationPeer("1",
116        ReplicationPeerConfig.newBuilder()
117          .setClusterKey(ZKConfig.getZooKeeperClusterKey(UTIL2.getConfiguration()))
118          .putConfiguration(HBaseKerberosUtils.KRB_PRINCIPAL,
119            UTIL2.getConfiguration().get(HBaseKerberosUtils.KRB_PRINCIPAL))
120          .putConfiguration(HBaseKerberosUtils.MASTER_KRB_PRINCIPAL,
121            UTIL2.getConfiguration().get(HBaseKerberosUtils.MASTER_KRB_PRINCIPAL))
122          .build());
123    }
124  }
125
126  @AfterClass
127  public static void cleanup() throws IOException {
128    UTIL1.shutdownMiniCluster();
129    UTIL2.shutdownMiniCluster();
130  }
131
132  @Parameters
133  public static Collection<Supplier<String>> peer() {
134    return Arrays.asList(() -> "1",
135      () -> ZKConfig.getZooKeeperClusterKey(UTIL2.getConfiguration()));
136  }
137
138  @Parameter
139  public Supplier<String> peer;
140
141  @Test
142  @SuppressWarnings("unchecked")
143  public void testJobCredentials() throws Exception {
144    Job job = new VerifyReplication().createSubmittableJob(
145      new Configuration(UTIL1.getConfiguration()), new String[] { peer.get(), "table" });
146
147    Credentials credentials = job.getCredentials();
148    Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
149    assertEquals(2, tokens.size());
150
151    String clusterId1 = ZKClusterId.readClusterIdZNode(UTIL1.getZooKeeperWatcher());
152    Token<AuthenticationTokenIdentifier> tokenForCluster1 =
153      (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId1));
154    assertEquals(FULL_USER_PRINCIPAL, tokenForCluster1.decodeIdentifier().getUsername());
155
156    String clusterId2 = ZKClusterId.readClusterIdZNode(UTIL2.getZooKeeperWatcher());
157    Token<AuthenticationTokenIdentifier> tokenForCluster2 =
158      (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId2));
159    assertEquals(FULL_USER_PRINCIPAL, tokenForCluster2.decodeIdentifier().getUsername());
160  }
161}