001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.Closeable;
026import java.io.File;
027import java.net.URI;
028import java.util.Collection;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.client.Scan;
033import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
034import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
035import org.apache.hadoop.hbase.testclassification.MapReduceTests;
036import org.apache.hadoop.hbase.testclassification.MediumTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
039import org.apache.hadoop.io.LongWritable;
040import org.apache.hadoop.io.Text;
041import org.apache.hadoop.mapreduce.Job;
042import org.apache.hadoop.minikdc.MiniKdc;
043import org.apache.hadoop.security.Credentials;
044import org.apache.hadoop.security.UserGroupInformation;
045import org.apache.hadoop.security.token.Token;
046import org.apache.hadoop.security.token.TokenIdentifier;
047import org.junit.After;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051
052/**
053 * Test different variants of initTableMapperJob method
054 */
055@Category({ MapReduceTests.class, MediumTests.class })
056public class TestTableMapReduceUtil {
057  private static final String HTTP_PRINCIPAL = "HTTP/localhost";
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestTableMapReduceUtil.class);
062
063  @After
064  public void after() {
065    SaslClientAuthenticationProviders.reset();
066  }
067
068  /*
069   * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method
070   * depends on an online cluster.
071   */
072
073  @Test
074  public void testInitTableMapperJob1() throws Exception {
075    Configuration configuration = new Configuration();
076    Job job = Job.getInstance(configuration, "tableName");
077    // test
078    TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
079      Text.class, job, false, WALInputFormat.class);
080    assertEquals(WALInputFormat.class, job.getInputFormatClass());
081    assertEquals(Import.Importer.class, job.getMapperClass());
082    assertEquals(LongWritable.class, job.getOutputKeyClass());
083    assertEquals(Text.class, job.getOutputValueClass());
084    assertNull(job.getCombinerClass());
085    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
086  }
087
088  @Test
089  public void testInitTableMapperJob2() throws Exception {
090    Configuration configuration = new Configuration();
091    Job job = Job.getInstance(configuration, "tableName");
092    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
093      Text.class, Text.class, job, false, WALInputFormat.class);
094    assertEquals(WALInputFormat.class, job.getInputFormatClass());
095    assertEquals(Import.Importer.class, job.getMapperClass());
096    assertEquals(LongWritable.class, job.getOutputKeyClass());
097    assertEquals(Text.class, job.getOutputValueClass());
098    assertNull(job.getCombinerClass());
099    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
100  }
101
102  @Test
103  public void testInitTableMapperJob3() throws Exception {
104    Configuration configuration = new Configuration();
105    Job job = Job.getInstance(configuration, "tableName");
106    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
107      Text.class, Text.class, job);
108    assertEquals(TableInputFormat.class, job.getInputFormatClass());
109    assertEquals(Import.Importer.class, job.getMapperClass());
110    assertEquals(LongWritable.class, job.getOutputKeyClass());
111    assertEquals(Text.class, job.getOutputValueClass());
112    assertNull(job.getCombinerClass());
113    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
114  }
115
116  @Test
117  public void testInitTableMapperJob4() throws Exception {
118    Configuration configuration = new Configuration();
119    Job job = Job.getInstance(configuration, "tableName");
120    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
121      Text.class, Text.class, job, false);
122    assertEquals(TableInputFormat.class, job.getInputFormatClass());
123    assertEquals(Import.Importer.class, job.getMapperClass());
124    assertEquals(LongWritable.class, job.getOutputKeyClass());
125    assertEquals(Text.class, job.getOutputValueClass());
126    assertNull(job.getCombinerClass());
127    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
128  }
129
130  @Test
131  public void testInitCredentialsForCluster1() throws Exception {
132    HBaseTestingUtil util1 = new HBaseTestingUtil();
133    HBaseTestingUtil util2 = new HBaseTestingUtil();
134
135    util1.startMiniCluster();
136    try {
137      util2.startMiniCluster();
138      try {
139        Configuration conf1 = util1.getConfiguration();
140        Job job = Job.getInstance(conf1);
141
142        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
143
144        Credentials credentials = job.getCredentials();
145        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
146        assertTrue(tokens.isEmpty());
147      } finally {
148        util2.shutdownMiniCluster();
149      }
150    } finally {
151      util1.shutdownMiniCluster();
152    }
153  }
154
155  @Test
156  @SuppressWarnings("unchecked")
157  public void testInitCredentialsForCluster2() throws Exception {
158    HBaseTestingUtil util1 = new HBaseTestingUtil();
159    HBaseTestingUtil util2 = new HBaseTestingUtil();
160
161    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
162    MiniKdc kdc = util1.setupMiniKdc(keytab);
163    try {
164      String username = UserGroupInformation.getLoginUser().getShortUserName();
165      String userPrincipal = username + "/localhost";
166      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
167      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
168
169      try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL);
170        Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
171        Configuration conf1 = util1.getConfiguration();
172        Job job = Job.getInstance(conf1);
173
174        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
175
176        Credentials credentials = job.getCredentials();
177        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
178        assertEquals(1, tokens.size());
179
180        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
181        Token<AuthenticationTokenIdentifier> tokenForCluster =
182          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
183        assertEquals(userPrincipal + '@' + kdc.getRealm(),
184          tokenForCluster.decodeIdentifier().getUsername());
185      }
186    } finally {
187      kdc.stop();
188    }
189  }
190
191  @Test
192  public void testInitCredentialsForCluster3() throws Exception {
193    HBaseTestingUtil util1 = new HBaseTestingUtil();
194
195    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
196    MiniKdc kdc = util1.setupMiniKdc(keytab);
197    try {
198      String username = UserGroupInformation.getLoginUser().getShortUserName();
199      String userPrincipal = username + "/localhost";
200      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
201      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
202
203      try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
204        HBaseTestingUtil util2 = new HBaseTestingUtil();
205        // Assume util2 is insecure cluster
206        // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at
207        // once
208
209        Configuration conf1 = util1.getConfiguration();
210        Job job = Job.getInstance(conf1);
211
212        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
213
214        Credentials credentials = job.getCredentials();
215        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
216        assertTrue(tokens.isEmpty());
217      }
218    } finally {
219      kdc.stop();
220    }
221  }
222
223  @Test
224  @SuppressWarnings("unchecked")
225  public void testInitCredentialsForCluster4() throws Exception {
226    HBaseTestingUtil util1 = new HBaseTestingUtil();
227    // Assume util1 is insecure cluster
228    // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once
229
230    HBaseTestingUtil util2 = new HBaseTestingUtil();
231    File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath());
232    MiniKdc kdc = util2.setupMiniKdc(keytab);
233    try {
234      String username = UserGroupInformation.getLoginUser().getShortUserName();
235      String userPrincipal = username + "/localhost";
236      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
237      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
238
239      try (Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
240        Configuration conf1 = util1.getConfiguration();
241        Job job = Job.getInstance(conf1);
242
243        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
244
245        Credentials credentials = job.getCredentials();
246        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
247        assertEquals(1, tokens.size());
248
249        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
250        Token<AuthenticationTokenIdentifier> tokenForCluster =
251          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
252        assertEquals(userPrincipal + '@' + kdc.getRealm(),
253          tokenForCluster.decodeIdentifier().getUsername());
254      }
255    } finally {
256      kdc.stop();
257    }
258  }
259
260  @Test
261  @SuppressWarnings("unchecked")
262  public void testInitCredentialsForClusterUri() throws Exception {
263    HBaseTestingUtil util1 = new HBaseTestingUtil();
264    HBaseTestingUtil util2 = new HBaseTestingUtil();
265
266    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
267    MiniKdc kdc = util1.setupMiniKdc(keytab);
268    try {
269      String username = UserGroupInformation.getLoginUser().getShortUserName();
270      String userPrincipal = username + "/localhost";
271      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
272      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
273
274      try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL);
275        Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
276        Configuration conf1 = util1.getConfiguration();
277        Job job = Job.getInstance(conf1);
278
279        // use Configuration from util1 and URI from util2, to make sure that we use the URI instead
280        // of rely on the Configuration
281        TableMapReduceUtil.initCredentialsForCluster(job, util1.getConfiguration(),
282          new URI(util2.getRpcConnnectionURI()));
283
284        Credentials credentials = job.getCredentials();
285        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
286        assertEquals(1, tokens.size());
287
288        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
289        Token<AuthenticationTokenIdentifier> tokenForCluster =
290          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
291        assertEquals(userPrincipal + '@' + kdc.getRealm(),
292          tokenForCluster.decodeIdentifier().getUsername());
293      }
294    } finally {
295      kdc.stop();
296    }
297  }
298}