001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertNull;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.Closeable;
026import java.io.File;
027import java.net.URI;
028import java.util.Collection;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.MapReduceTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
037import org.apache.hadoop.io.LongWritable;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.mapreduce.Job;
040import org.apache.hadoop.minikdc.MiniKdc;
041import org.apache.hadoop.security.Credentials;
042import org.apache.hadoop.security.UserGroupInformation;
043import org.apache.hadoop.security.token.Token;
044import org.apache.hadoop.security.token.TokenIdentifier;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047
048/**
049 * Test different variants of initTableMapperJob method
050 */
051@Tag(MapReduceTests.TAG)
052@Tag(LargeTests.TAG)
053public class TestTableMapReduceUtil {
054  private static final String HTTP_PRINCIPAL = "HTTP/localhost";
055
056  /*
057   * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method
058   * depends on an online cluster.
059   */
060
061  @Test
062  public void testInitTableMapperJob1() throws Exception {
063    Configuration configuration = new Configuration();
064    Job job = Job.getInstance(configuration, "tableName");
065    // test
066    TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
067      Text.class, job, false, WALInputFormat.class);
068    assertEquals(WALInputFormat.class, job.getInputFormatClass());
069    assertEquals(Import.Importer.class, job.getMapperClass());
070    assertEquals(LongWritable.class, job.getOutputKeyClass());
071    assertEquals(Text.class, job.getOutputValueClass());
072    assertNull(job.getCombinerClass());
073    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
074  }
075
076  @Test
077  public void testInitTableMapperJob2() throws Exception {
078    Configuration configuration = new Configuration();
079    Job job = Job.getInstance(configuration, "tableName");
080    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
081      Text.class, Text.class, job, false, WALInputFormat.class);
082    assertEquals(WALInputFormat.class, job.getInputFormatClass());
083    assertEquals(Import.Importer.class, job.getMapperClass());
084    assertEquals(LongWritable.class, job.getOutputKeyClass());
085    assertEquals(Text.class, job.getOutputValueClass());
086    assertNull(job.getCombinerClass());
087    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
088  }
089
090  @Test
091  public void testInitTableMapperJob3() throws Exception {
092    Configuration configuration = new Configuration();
093    Job job = Job.getInstance(configuration, "tableName");
094    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
095      Text.class, Text.class, job);
096    assertEquals(TableInputFormat.class, job.getInputFormatClass());
097    assertEquals(Import.Importer.class, job.getMapperClass());
098    assertEquals(LongWritable.class, job.getOutputKeyClass());
099    assertEquals(Text.class, job.getOutputValueClass());
100    assertNull(job.getCombinerClass());
101    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
102  }
103
104  @Test
105  public void testInitTableMapperJob4() throws Exception {
106    Configuration configuration = new Configuration();
107    Job job = Job.getInstance(configuration, "tableName");
108    TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class,
109      Text.class, Text.class, job, false);
110    assertEquals(TableInputFormat.class, job.getInputFormatClass());
111    assertEquals(Import.Importer.class, job.getMapperClass());
112    assertEquals(LongWritable.class, job.getOutputKeyClass());
113    assertEquals(Text.class, job.getOutputValueClass());
114    assertNull(job.getCombinerClass());
115    assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
116  }
117
118  @Test
119  public void testInitCredentialsForCluster1() throws Exception {
120    HBaseTestingUtil util1 = new HBaseTestingUtil();
121    HBaseTestingUtil util2 = new HBaseTestingUtil();
122
123    util1.startMiniCluster();
124    try {
125      util2.startMiniCluster();
126      try {
127        Configuration conf1 = util1.getConfiguration();
128        Job job = Job.getInstance(conf1);
129
130        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
131
132        Credentials credentials = job.getCredentials();
133        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
134        assertTrue(tokens.isEmpty());
135      } finally {
136        util2.shutdownMiniCluster();
137      }
138    } finally {
139      util1.shutdownMiniCluster();
140    }
141  }
142
143  @Test
144  @SuppressWarnings("unchecked")
145  public void testInitCredentialsForCluster2() throws Exception {
146    HBaseTestingUtil util1 = new HBaseTestingUtil();
147    HBaseTestingUtil util2 = new HBaseTestingUtil();
148
149    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
150    MiniKdc kdc = util1.setupMiniKdc(keytab);
151    try {
152      String username = UserGroupInformation.getLoginUser().getShortUserName();
153      String userPrincipal = username + "/localhost";
154      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
155      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
156
157      try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL);
158        Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
159        Configuration conf1 = util1.getConfiguration();
160        Job job = Job.getInstance(conf1);
161
162        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
163
164        Credentials credentials = job.getCredentials();
165        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
166        assertEquals(1, tokens.size());
167
168        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
169        Token<AuthenticationTokenIdentifier> tokenForCluster =
170          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
171        assertEquals(userPrincipal + '@' + kdc.getRealm(),
172          tokenForCluster.decodeIdentifier().getUsername());
173      }
174    } finally {
175      kdc.stop();
176    }
177  }
178
179  @Test
180  public void testInitCredentialsForCluster3() throws Exception {
181    HBaseTestingUtil util1 = new HBaseTestingUtil();
182
183    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
184    MiniKdc kdc = util1.setupMiniKdc(keytab);
185    try {
186      String username = UserGroupInformation.getLoginUser().getShortUserName();
187      String userPrincipal = username + "/localhost";
188      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
189      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
190
191      try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
192        HBaseTestingUtil util2 = new HBaseTestingUtil();
193        // Assume util2 is insecure cluster
194        // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at
195        // once
196
197        Configuration conf1 = util1.getConfiguration();
198        Job job = Job.getInstance(conf1);
199
200        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
201
202        Credentials credentials = job.getCredentials();
203        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
204        assertTrue(tokens.isEmpty());
205      }
206    } finally {
207      kdc.stop();
208    }
209  }
210
211  @Test
212  @SuppressWarnings("unchecked")
213  public void testInitCredentialsForCluster4() throws Exception {
214    HBaseTestingUtil util1 = new HBaseTestingUtil();
215    // Assume util1 is insecure cluster
216    // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once
217
218    HBaseTestingUtil util2 = new HBaseTestingUtil();
219    File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath());
220    MiniKdc kdc = util2.setupMiniKdc(keytab);
221    try {
222      String username = UserGroupInformation.getLoginUser().getShortUserName();
223      String userPrincipal = username + "/localhost";
224      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
225      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
226
227      try (Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
228        Configuration conf1 = util1.getConfiguration();
229        Job job = Job.getInstance(conf1);
230
231        TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
232
233        Credentials credentials = job.getCredentials();
234        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
235        assertEquals(1, tokens.size());
236
237        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
238        Token<AuthenticationTokenIdentifier> tokenForCluster =
239          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
240        assertEquals(userPrincipal + '@' + kdc.getRealm(),
241          tokenForCluster.decodeIdentifier().getUsername());
242      }
243    } finally {
244      kdc.stop();
245    }
246  }
247
248  @Test
249  @SuppressWarnings("unchecked")
250  public void testInitCredentialsForClusterUri() throws Exception {
251    HBaseTestingUtil util1 = new HBaseTestingUtil();
252    HBaseTestingUtil util2 = new HBaseTestingUtil();
253
254    File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
255    MiniKdc kdc = util1.setupMiniKdc(keytab);
256    try {
257      String username = UserGroupInformation.getLoginUser().getShortUserName();
258      String userPrincipal = username + "/localhost";
259      kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
260      loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
261
262      try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL);
263        Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
264        Configuration conf1 = util1.getConfiguration();
265        Job job = Job.getInstance(conf1);
266
267        // use Configuration from util1 and URI from util2, to make sure that we use the URI instead
268        // of rely on the Configuration
269        TableMapReduceUtil.initCredentialsForCluster(job, util1.getConfiguration(),
270          new URI(util2.getRpcConnnectionURI()));
271
272        Credentials credentials = job.getCredentials();
273        Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
274        assertEquals(1, tokens.size());
275
276        String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
277        Token<AuthenticationTokenIdentifier> tokenForCluster =
278          (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
279        assertEquals(userPrincipal + '@' + kdc.getRealm(),
280          tokenForCluster.decodeIdentifier().getUsername());
281      }
282    } finally {
283      kdc.stop();
284    }
285  }
286}