001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.Closeable;
026import java.io.File;
027import java.util.Collection;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
033import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
034import org.apache.hadoop.hbase.security.access.AccessController;
035import org.apache.hadoop.hbase.security.access.PermissionStorage;
036import org.apache.hadoop.hbase.security.access.SecureTestUtil;
037import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
038import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
039import org.apache.hadoop.hbase.security.token.TokenProvider;
040import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
041import org.apache.hadoop.hbase.testclassification.MapReduceTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
045import org.apache.hadoop.io.LongWritable;
046import org.apache.hadoop.io.Text;
047import org.apache.hadoop.mapreduce.Job;
048import org.apache.hadoop.minikdc.MiniKdc;
049import org.apache.hadoop.security.Credentials;
050import org.apache.hadoop.security.UserGroupInformation;
051import org.apache.hadoop.security.token.Token;
052import org.apache.hadoop.security.token.TokenIdentifier;
053import org.junit.After;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057
058/**
059 * Test different variants of initTableMapperJob method
060 */
061@Category({ MapReduceTests.class, MediumTests.class })
062public class TestTableMapReduceUtil {
063  private static final String HTTP_PRINCIPAL = "HTTP/localhost";
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestTableMapReduceUtil.class);
068
069  @After
070  public void after() {
071    SaslClientAuthenticationProviders.reset();
072  }
073
074  /*
075   * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method
076   * depends on an online cluster.
077   */
078
079  @Test
080  public void testInitTableMapperJob1() throws Exception {
081    Configuration configuration = new Configuration();
082    Job job = Job.getInstance(configuration, "tableName");
083    // test
084    TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
085      Text.class, job, false, WALInputFormat.class);
086    assertEquals(WALInputFormat.class, job.getInputFormatClass());
087    assertEquals(Import.Importer.class, job.getMapperClass());
088    assertEquals(LongWritable.class, job.getOutputKeyClass());
089    assertEquals(Text.class, job.getOutputValueClass());
090    assertNull(job.getCombinerClass());
091    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
092  }
093
094  @Test
095  public void testInitTableMapperJob2() throws Exception {
096    Configuration configuration = new Configuration();
097    Job job = Job.getInstance(configuration, "tableName");
098    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
099      Text.class, Text.class, job, false, WALInputFormat.class);
100    assertEquals(WALInputFormat.class, job.getInputFormatClass());
101    assertEquals(Import.Importer.class, job.getMapperClass());
102    assertEquals(LongWritable.class, job.getOutputKeyClass());
103    assertEquals(Text.class, job.getOutputValueClass());
104    assertNull(job.getCombinerClass());
105    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
106  }
107
108  @Test
109  public void testInitTableMapperJob3() throws Exception {
110    Configuration configuration = new Configuration();
111    Job job = Job.getInstance(configuration, "tableName");
112    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
113      Text.class, Text.class, job);
114    assertEquals(TableInputFormat.class, job.getInputFormatClass());
115    assertEquals(Import.Importer.class, job.getMapperClass());
116    assertEquals(LongWritable.class, job.getOutputKeyClass());
117    assertEquals(Text.class, job.getOutputValueClass());
118    assertNull(job.getCombinerClass());
119    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
120  }
121
122  @Test
123  public void testInitTableMapperJob4() throws Exception {
124    Configuration configuration = new Configuration();
125    Job job = Job.getInstance(configuration, "tableName");
126    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
127      Text.class, Text.class, job, false);
128    assertEquals(TableInputFormat.class, job.getInputFormatClass());
129    assertEquals(Import.Importer.class, job.getMapperClass());
130    assertEquals(LongWritable.class, job.getOutputKeyClass());
131    assertEquals(Text.class, job.getOutputValueClass());
132    assertNull(job.getCombinerClass());
133    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
134  }
135
136  private static Closeable startSecureMiniCluster(HBaseTestingUtil util, MiniKdc kdc,
137    String principal) throws Exception {
138    Configuration conf = util.getConfiguration();
139
140    SecureTestUtil.enableSecurity(conf);
141    VisibilityTestUtil.enableVisiblityLabels(conf);
142    SecureTestUtil.verifyConfiguration(conf);
143
144    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
145      AccessController.class.getName() + ',' + TokenProvider.class.getName());
146
147    HBaseKerberosUtils.setSecuredConfiguration(conf, principal + '@' + kdc.getRealm(),
148      HTTP_PRINCIPAL + '@' + kdc.getRealm());
149
150    util.startMiniCluster();
151    try {
152      util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
153    } catch (Exception e) {
154      util.shutdownMiniCluster();
155      throw e;
156    }
157
158    return util::shutdownMiniCluster;
159  }
160
161  @Test
162  public void testInitCredentialsForCluster1() throws Exception {
163    HBaseTestingUtil util1 = new HBaseTestingUtil();
164    HBaseTestingUtil util2 = new HBaseTestingUtil();
165
166    util1.startMiniCluster();
167    try {
168      util2.startMiniCluster();
169      try {
170        Configuration conf1 = util1.getConfiguration();
171        Job job = Job.getInstance(conf1);
172
173        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
174
175        Credentials credentials = job.getCredentials();
176        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
177        assertTrue(tokens.isEmpty());
178      } finally {
179        util2.shutdownMiniCluster();
180      }
181    } finally {
182      util1.shutdownMiniCluster();
183    }
184  }
185
186  @Test
187  @SuppressWarnings("unchecked")
188  public void testInitCredentialsForCluster2() throws Exception {
189    HBaseTestingUtil util1 = new HBaseTestingUtil();
190    HBaseTestingUtil util2 = new HBaseTestingUtil();
191
192    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
193    MiniKdc kdc = util1.setupMiniKdc(keytab);
194    try {
195      String username = UserGroupInformation.getLoginUser().getShortUserName();
196      String userPrincipal = username + "/localhost";
197      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
198      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
199
200      try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal);
201        Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) {
202        Configuration conf1 = util1.getConfiguration();
203        Job job = Job.getInstance(conf1);
204
205        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
206
207        Credentials credentials = job.getCredentials();
208        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
209        assertEquals(1, tokens.size());
210
211        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
212        Token<AuthenticationTokenIdentifier> tokenForCluster =
213          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
214        assertEquals(userPrincipal + '@' + kdc.getRealm(),
215          tokenForCluster.decodeIdentifier().getUsername());
216      }
217    } finally {
218      kdc.stop();
219    }
220  }
221
222  @Test
223  public void testInitCredentialsForCluster3() throws Exception {
224    HBaseTestingUtil util1 = new HBaseTestingUtil();
225
226    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
227    MiniKdc kdc = util1.setupMiniKdc(keytab);
228    try {
229      String username = UserGroupInformation.getLoginUser().getShortUserName();
230      String userPrincipal = username + "/localhost";
231      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
232      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
233
234      try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal)) {
235        HBaseTestingUtil util2 = new HBaseTestingUtil();
236        // Assume util2 is insecure cluster
237        // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at
238        // once
239
240        Configuration conf1 = util1.getConfiguration();
241        Job job = Job.getInstance(conf1);
242
243        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
244
245        Credentials credentials = job.getCredentials();
246        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
247        assertTrue(tokens.isEmpty());
248      }
249    } finally {
250      kdc.stop();
251    }
252  }
253
254  @Test
255  @SuppressWarnings("unchecked")
256  public void testInitCredentialsForCluster4() throws Exception {
257    HBaseTestingUtil util1 = new HBaseTestingUtil();
258    // Assume util1 is insecure cluster
259    // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once
260
261    HBaseTestingUtil util2 = new HBaseTestingUtil();
262    File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath());
263    MiniKdc kdc = util2.setupMiniKdc(keytab);
264    try {
265      String username = UserGroupInformation.getLoginUser().getShortUserName();
266      String userPrincipal = username + "/localhost";
267      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
268      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
269
270      try (Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) {
271        Configuration conf1 = util1.getConfiguration();
272        Job job = Job.getInstance(conf1);
273
274        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
275
276        Credentials credentials = job.getCredentials();
277        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
278        assertEquals(1, tokens.size());
279
280        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
281        Token<AuthenticationTokenIdentifier> tokenForCluster =
282          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
283        assertEquals(userPrincipal + '@' + kdc.getRealm(),
284          tokenForCluster.decodeIdentifier().getUsername());
285      }
286    } finally {
287      kdc.stop();
288    }
289  }
290}