001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.Closeable;
026import java.io.File;
027import java.util.Collection;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
033import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
034import org.apache.hadoop.hbase.security.access.AccessController;
035import org.apache.hadoop.hbase.security.access.PermissionStorage;
036import org.apache.hadoop.hbase.security.access.SecureTestUtil;
037import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
038import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
039import org.apache.hadoop.hbase.security.token.TokenProvider;
040import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
041import org.apache.hadoop.hbase.testclassification.MapReduceTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
045import org.apache.hadoop.io.LongWritable;
046import org.apache.hadoop.io.Text;
047import org.apache.hadoop.mapreduce.Job;
048import org.apache.hadoop.minikdc.MiniKdc;
049import org.apache.hadoop.security.Credentials;
050import org.apache.hadoop.security.UserGroupInformation;
051import org.apache.hadoop.security.authentication.util.KerberosName;
052import org.apache.hadoop.security.token.Token;
053import org.apache.hadoop.security.token.TokenIdentifier;
054import org.junit.After;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058
059/**
060 * Test different variants of initTableMapperJob method
061 */
062@Category({ MapReduceTests.class, MediumTests.class })
063public class TestTableMapReduceUtil {
064  private static final String HTTP_PRINCIPAL = "HTTP/localhost";
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestTableMapReduceUtil.class);
069
070  @After
071  public void after() {
072    SaslClientAuthenticationProviders.reset();
073  }
074
075  /*
076   * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method
077   * depends on an online cluster.
078   */
079
080  @Test
081  public void testInitTableMapperJob1() throws Exception {
082    Configuration configuration = new Configuration();
083    Job job = Job.getInstance(configuration, "tableName");
084    // test
085    TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
086      Text.class, job, false, WALInputFormat.class);
087    assertEquals(WALInputFormat.class, job.getInputFormatClass());
088    assertEquals(Import.Importer.class, job.getMapperClass());
089    assertEquals(LongWritable.class, job.getOutputKeyClass());
090    assertEquals(Text.class, job.getOutputValueClass());
091    assertNull(job.getCombinerClass());
092    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
093  }
094
095  @Test
096  public void testInitTableMapperJob2() throws Exception {
097    Configuration configuration = new Configuration();
098    Job job = Job.getInstance(configuration, "tableName");
099    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
100      Text.class, Text.class, job, false, WALInputFormat.class);
101    assertEquals(WALInputFormat.class, job.getInputFormatClass());
102    assertEquals(Import.Importer.class, job.getMapperClass());
103    assertEquals(LongWritable.class, job.getOutputKeyClass());
104    assertEquals(Text.class, job.getOutputValueClass());
105    assertNull(job.getCombinerClass());
106    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
107  }
108
109  @Test
110  public void testInitTableMapperJob3() throws Exception {
111    Configuration configuration = new Configuration();
112    Job job = Job.getInstance(configuration, "tableName");
113    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
114      Text.class, Text.class, job);
115    assertEquals(TableInputFormat.class, job.getInputFormatClass());
116    assertEquals(Import.Importer.class, job.getMapperClass());
117    assertEquals(LongWritable.class, job.getOutputKeyClass());
118    assertEquals(Text.class, job.getOutputValueClass());
119    assertNull(job.getCombinerClass());
120    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
121  }
122
123  @Test
124  public void testInitTableMapperJob4() throws Exception {
125    Configuration configuration = new Configuration();
126    Job job = Job.getInstance(configuration, "tableName");
127    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
128      Text.class, Text.class, job, false);
129    assertEquals(TableInputFormat.class, job.getInputFormatClass());
130    assertEquals(Import.Importer.class, job.getMapperClass());
131    assertEquals(LongWritable.class, job.getOutputKeyClass());
132    assertEquals(Text.class, job.getOutputValueClass());
133    assertNull(job.getCombinerClass());
134    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
135  }
136
137  private static Closeable startSecureMiniCluster(HBaseTestingUtility util, MiniKdc kdc,
138    String principal) throws Exception {
139    Configuration conf = util.getConfiguration();
140
141    SecureTestUtil.enableSecurity(conf);
142    VisibilityTestUtil.enableVisiblityLabels(conf);
143    SecureTestUtil.verifyConfiguration(conf);
144
145    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
146      AccessController.class.getName() + ',' + TokenProvider.class.getName());
147
148    HBaseKerberosUtils.setSecuredConfiguration(conf, principal + '@' + kdc.getRealm(),
149      HTTP_PRINCIPAL + '@' + kdc.getRealm());
150
151    KerberosName.resetDefaultRealm();
152
153    util.startMiniCluster();
154    try {
155      util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
156    } catch (Exception e) {
157      util.shutdownMiniCluster();
158      throw e;
159    }
160
161    return util::shutdownMiniCluster;
162  }
163
164  @Test
165  public void testInitCredentialsForCluster1() throws Exception {
166    HBaseTestingUtility util1 = new HBaseTestingUtility();
167    HBaseTestingUtility util2 = new HBaseTestingUtility();
168
169    util1.startMiniCluster();
170    try {
171      util2.startMiniCluster();
172      try {
173        Configuration conf1 = util1.getConfiguration();
174        Job job = Job.getInstance(conf1);
175
176        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
177
178        Credentials credentials = job.getCredentials();
179        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
180        assertTrue(tokens.isEmpty());
181      } finally {
182        util2.shutdownMiniCluster();
183      }
184    } finally {
185      util1.shutdownMiniCluster();
186    }
187  }
188
189  @Test
190  @SuppressWarnings("unchecked")
191  public void testInitCredentialsForCluster2() throws Exception {
192    HBaseTestingUtility util1 = new HBaseTestingUtility();
193    HBaseTestingUtility util2 = new HBaseTestingUtility();
194
195    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
196    MiniKdc kdc = util1.setupMiniKdc(keytab);
197    String username = UserGroupInformation.getLoginUser().getShortUserName();
198    String userPrincipal = username + "/localhost";
199    kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
200    loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
201
202    try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal);
203      Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) {
204      try {
205        Configuration conf1 = util1.getConfiguration();
206        Job job = Job.getInstance(conf1);
207
208        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
209
210        Credentials credentials = job.getCredentials();
211        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
212        assertEquals(1, tokens.size());
213
214        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
215        Token<AuthenticationTokenIdentifier> tokenForCluster =
216          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
217        assertEquals(userPrincipal + '@' + kdc.getRealm(),
218          tokenForCluster.decodeIdentifier().getUsername());
219      } finally {
220        kdc.stop();
221      }
222    }
223  }
224
225  @Test
226  public void testInitCredentialsForCluster3() throws Exception {
227    HBaseTestingUtility util1 = new HBaseTestingUtility();
228
229    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
230    MiniKdc kdc = util1.setupMiniKdc(keytab);
231    String username = UserGroupInformation.getLoginUser().getShortUserName();
232    String userPrincipal = username + "/localhost";
233    kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
234    loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
235
236    try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal)) {
237      try {
238        HBaseTestingUtility util2 = new HBaseTestingUtility();
239        // Assume util2 is insecure cluster
240        // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at
241        // once
242
243        Configuration conf1 = util1.getConfiguration();
244        Job job = Job.getInstance(conf1);
245
246        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
247
248        Credentials credentials = job.getCredentials();
249        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
250        assertTrue(tokens.isEmpty());
251      } finally {
252        kdc.stop();
253      }
254    }
255  }
256
257  @Test
258  @SuppressWarnings("unchecked")
259  public void testInitCredentialsForCluster4() throws Exception {
260    HBaseTestingUtility util1 = new HBaseTestingUtility();
261    // Assume util1 is insecure cluster
262    // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once
263
264    HBaseTestingUtility util2 = new HBaseTestingUtility();
265    File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath());
266    MiniKdc kdc = util2.setupMiniKdc(keytab);
267    String username = UserGroupInformation.getLoginUser().getShortUserName();
268    String userPrincipal = username + "/localhost";
269    kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
270    loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
271
272    try (Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) {
273      try {
274        Configuration conf1 = util1.getConfiguration();
275        Job job = Job.getInstance(conf1);
276
277        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
278
279        Credentials credentials = job.getCredentials();
280        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
281        assertEquals(1, tokens.size());
282
283        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
284        Token<AuthenticationTokenIdentifier> tokenForCluster =
285          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
286        assertEquals(userPrincipal + '@' + kdc.getRealm(),
287          tokenForCluster.decodeIdentifier().getUsername());
288      } finally {
289        kdc.stop();
290      }
291    }
292  }
293}