001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import java.io.Closeable;
026import java.io.File;
027import java.net.URI;
028import java.util.Collection;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.client.Scan;
033import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
034import org.apache.hadoop.hbase.testclassification.MapReduceTests;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
038import org.apache.hadoop.io.LongWritable;
039import org.apache.hadoop.io.Text;
040import org.apache.hadoop.mapreduce.Job;
041import org.apache.hadoop.minikdc.MiniKdc;
042import org.apache.hadoop.security.Credentials;
043import org.apache.hadoop.security.UserGroupInformation;
044import org.apache.hadoop.security.token.Token;
045import org.apache.hadoop.security.token.TokenIdentifier;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050/**
051 * Test different variants of initTableMapperJob method
052 */
053@Category({ MapReduceTests.class, MediumTests.class })
054public class TestTableMapReduceUtil {
055  private static final String HTTP_PRINCIPAL = "HTTP/localhost";
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestTableMapReduceUtil.class);
060
061  /*
062   * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method
063   * depends on an online cluster.
064   */
065
066  @Test
067  public void testInitTableMapperJob1() throws Exception {
068    Configuration configuration = new Configuration();
069    Job job = Job.getInstance(configuration, "tableName");
070    // test
071    TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
072      Text.class, job, false, WALInputFormat.class);
073    assertEquals(WALInputFormat.class, job.getInputFormatClass());
074    assertEquals(Import.Importer.class, job.getMapperClass());
075    assertEquals(LongWritable.class, job.getOutputKeyClass());
076    assertEquals(Text.class, job.getOutputValueClass());
077    assertNull(job.getCombinerClass());
078    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
079  }
080
081  @Test
082  public void testInitTableMapperJob2() throws Exception {
083    Configuration configuration = new Configuration();
084    Job job = Job.getInstance(configuration, "tableName");
085    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
086      Text.class, Text.class, job, false, WALInputFormat.class);
087    assertEquals(WALInputFormat.class, job.getInputFormatClass());
088    assertEquals(Import.Importer.class, job.getMapperClass());
089    assertEquals(LongWritable.class, job.getOutputKeyClass());
090    assertEquals(Text.class, job.getOutputValueClass());
091    assertNull(job.getCombinerClass());
092    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
093  }
094
095  @Test
096  public void testInitTableMapperJob3() throws Exception {
097    Configuration configuration = new Configuration();
098    Job job = Job.getInstance(configuration, "tableName");
099    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
100      Text.class, Text.class, job);
101    assertEquals(TableInputFormat.class, job.getInputFormatClass());
102    assertEquals(Import.Importer.class, job.getMapperClass());
103    assertEquals(LongWritable.class, job.getOutputKeyClass());
104    assertEquals(Text.class, job.getOutputValueClass());
105    assertNull(job.getCombinerClass());
106    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
107  }
108
109  @Test
110  public void testInitTableMapperJob4() throws Exception {
111    Configuration configuration = new Configuration();
112    Job job = Job.getInstance(configuration, "tableName");
113    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
114      Text.class, Text.class, job, false);
115    assertEquals(TableInputFormat.class, job.getInputFormatClass());
116    assertEquals(Import.Importer.class, job.getMapperClass());
117    assertEquals(LongWritable.class, job.getOutputKeyClass());
118    assertEquals(Text.class, job.getOutputValueClass());
119    assertNull(job.getCombinerClass());
120    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
121  }
122
123  @Test
124  public void testInitCredentialsForCluster1() throws Exception {
125    HBaseTestingUtil util1 = new HBaseTestingUtil();
126    HBaseTestingUtil util2 = new HBaseTestingUtil();
127
128    util1.startMiniCluster();
129    try {
130      util2.startMiniCluster();
131      try {
132        Configuration conf1 = util1.getConfiguration();
133        Job job = Job.getInstance(conf1);
134
135        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
136
137        Credentials credentials = job.getCredentials();
138        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
139        assertTrue(tokens.isEmpty());
140      } finally {
141        util2.shutdownMiniCluster();
142      }
143    } finally {
144      util1.shutdownMiniCluster();
145    }
146  }
147
148  @Test
149  @SuppressWarnings("unchecked")
150  public void testInitCredentialsForCluster2() throws Exception {
151    HBaseTestingUtil util1 = new HBaseTestingUtil();
152    HBaseTestingUtil util2 = new HBaseTestingUtil();
153
154    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
155    MiniKdc kdc = util1.setupMiniKdc(keytab);
156    try {
157      String username = UserGroupInformation.getLoginUser().getShortUserName();
158      String userPrincipal = username + "/localhost";
159      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
160      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
161
162      try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL);
163        Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
164        Configuration conf1 = util1.getConfiguration();
165        Job job = Job.getInstance(conf1);
166
167        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
168
169        Credentials credentials = job.getCredentials();
170        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
171        assertEquals(1, tokens.size());
172
173        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
174        Token<AuthenticationTokenIdentifier> tokenForCluster =
175          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
176        assertEquals(userPrincipal + '@' + kdc.getRealm(),
177          tokenForCluster.decodeIdentifier().getUsername());
178      }
179    } finally {
180      kdc.stop();
181    }
182  }
183
184  @Test
185  public void testInitCredentialsForCluster3() throws Exception {
186    HBaseTestingUtil util1 = new HBaseTestingUtil();
187
188    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
189    MiniKdc kdc = util1.setupMiniKdc(keytab);
190    try {
191      String username = UserGroupInformation.getLoginUser().getShortUserName();
192      String userPrincipal = username + "/localhost";
193      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
194      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
195
196      try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
197        HBaseTestingUtil util2 = new HBaseTestingUtil();
198        // Assume util2 is insecure cluster
199        // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at
200        // once
201
202        Configuration conf1 = util1.getConfiguration();
203        Job job = Job.getInstance(conf1);
204
205        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
206
207        Credentials credentials = job.getCredentials();
208        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
209        assertTrue(tokens.isEmpty());
210      }
211    } finally {
212      kdc.stop();
213    }
214  }
215
216  @Test
217  @SuppressWarnings("unchecked")
218  public void testInitCredentialsForCluster4() throws Exception {
219    HBaseTestingUtil util1 = new HBaseTestingUtil();
220    // Assume util1 is insecure cluster
221    // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once
222
223    HBaseTestingUtil util2 = new HBaseTestingUtil();
224    File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath());
225    MiniKdc kdc = util2.setupMiniKdc(keytab);
226    try {
227      String username = UserGroupInformation.getLoginUser().getShortUserName();
228      String userPrincipal = username + "/localhost";
229      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
230      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
231
232      try (Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
233        Configuration conf1 = util1.getConfiguration();
234        Job job = Job.getInstance(conf1);
235
236        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
237
238        Credentials credentials = job.getCredentials();
239        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
240        assertEquals(1, tokens.size());
241
242        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
243        Token<AuthenticationTokenIdentifier> tokenForCluster =
244          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
245        assertEquals(userPrincipal + '@' + kdc.getRealm(),
246          tokenForCluster.decodeIdentifier().getUsername());
247      }
248    } finally {
249      kdc.stop();
250    }
251  }
252
253  @Test
254  @SuppressWarnings("unchecked")
255  public void testInitCredentialsForClusterUri() throws Exception {
256    HBaseTestingUtil util1 = new HBaseTestingUtil();
257    HBaseTestingUtil util2 = new HBaseTestingUtil();
258
259    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
260    MiniKdc kdc = util1.setupMiniKdc(keytab);
261    try {
262      String username = UserGroupInformation.getLoginUser().getShortUserName();
263      String userPrincipal = username + "/localhost";
264      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
265      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
266
267      try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL);
268        Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
269        Configuration conf1 = util1.getConfiguration();
270        Job job = Job.getInstance(conf1);
271
272        // use Configuration from util1 and URI from util2, to make sure that we use the URI instead
273        // of rely on the Configuration
274        TableMapReduceUtil.initCredentialsForCluster(job, util1.getConfiguration(),
275          new URI(util2.getRpcConnnectionURI()));
276
277        Credentials credentials = job.getCredentials();
278        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
279        assertEquals(1, tokens.size());
280
281        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
282        Token<AuthenticationTokenIdentifier> tokenForCluster =
283          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
284        assertEquals(userPrincipal + '@' + kdc.getRealm(),
285          tokenForCluster.decodeIdentifier().getUsername());
286      }
287    } finally {
288      kdc.stop();
289    }
290  }
291}