001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.io.Closeable; 026import java.io.File; 027import java.net.URI; 028import java.util.Collection; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; 033import org.apache.hadoop.hbase.testclassification.LargeTests; 034import org.apache.hadoop.hbase.testclassification.MapReduceTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 037import org.apache.hadoop.io.LongWritable; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.minikdc.MiniKdc; 041import org.apache.hadoop.security.Credentials; 042import org.apache.hadoop.security.UserGroupInformation; 043import org.apache.hadoop.security.token.Token; 044import org.apache.hadoop.security.token.TokenIdentifier; 045import org.junit.jupiter.api.Tag; 046import org.junit.jupiter.api.Test; 047 048/** 049 * Test different variants of initTableMapperJob method 050 */ 051@Tag(MapReduceTests.TAG) 052@Tag(LargeTests.TAG) 053public class TestTableMapReduceUtil { 054 private static final String HTTP_PRINCIPAL = "HTTP/localhost"; 055 056 /* 057 * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method 058 * depends on an online cluster. 059 */ 060 061 @Test 062 public void testInitTableMapperJob1() throws Exception { 063 Configuration configuration = new Configuration(); 064 Job job = Job.getInstance(configuration, "tableName"); 065 // test 066 TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class, 067 Text.class, job, false, WALInputFormat.class); 068 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 069 assertEquals(Import.Importer.class, job.getMapperClass()); 070 assertEquals(LongWritable.class, job.getOutputKeyClass()); 071 assertEquals(Text.class, job.getOutputValueClass()); 072 assertNull(job.getCombinerClass()); 073 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 074 } 075 076 @Test 077 public void testInitTableMapperJob2() throws Exception { 078 Configuration configuration = new Configuration(); 079 Job job = Job.getInstance(configuration, "tableName"); 080 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 081 Text.class, Text.class, job, false, WALInputFormat.class); 082 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 083 assertEquals(Import.Importer.class, job.getMapperClass()); 084 assertEquals(LongWritable.class, job.getOutputKeyClass()); 085 assertEquals(Text.class, job.getOutputValueClass()); 086 assertNull(job.getCombinerClass()); 087 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 088 } 089 090 @Test 091 public void testInitTableMapperJob3() throws Exception { 092 Configuration configuration = new Configuration(); 093 Job job = Job.getInstance(configuration, "tableName"); 094 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 095 Text.class, Text.class, job); 096 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 097 assertEquals(Import.Importer.class, job.getMapperClass()); 098 assertEquals(LongWritable.class, job.getOutputKeyClass()); 099 assertEquals(Text.class, job.getOutputValueClass()); 100 assertNull(job.getCombinerClass()); 101 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 102 } 103 104 @Test 105 public void testInitTableMapperJob4() throws Exception { 106 Configuration configuration = new Configuration(); 107 Job job = Job.getInstance(configuration, "tableName"); 108 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 109 Text.class, Text.class, job, false); 110 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 111 assertEquals(Import.Importer.class, job.getMapperClass()); 112 assertEquals(LongWritable.class, job.getOutputKeyClass()); 113 assertEquals(Text.class, job.getOutputValueClass()); 114 assertNull(job.getCombinerClass()); 115 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 116 } 117 118 @Test 119 public void testInitCredentialsForCluster1() throws Exception { 120 HBaseTestingUtil util1 = new HBaseTestingUtil(); 121 HBaseTestingUtil util2 = new HBaseTestingUtil(); 122 123 util1.startMiniCluster(); 124 try { 125 util2.startMiniCluster(); 126 try { 127 Configuration conf1 = util1.getConfiguration(); 128 Job job = Job.getInstance(conf1); 129 130 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 131 132 Credentials credentials = job.getCredentials(); 133 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 134 assertTrue(tokens.isEmpty()); 135 } finally { 136 util2.shutdownMiniCluster(); 137 } 138 } finally { 139 util1.shutdownMiniCluster(); 140 } 141 } 142 143 @Test 144 @SuppressWarnings("unchecked") 145 public void testInitCredentialsForCluster2() throws Exception { 146 HBaseTestingUtil util1 = new HBaseTestingUtil(); 147 HBaseTestingUtil util2 = new HBaseTestingUtil(); 148 149 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 150 MiniKdc kdc = util1.setupMiniKdc(keytab); 151 try { 152 String username = UserGroupInformation.getLoginUser().getShortUserName(); 153 String userPrincipal = username + "/localhost"; 154 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 155 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 156 157 try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL); 158 Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 159 Configuration conf1 = util1.getConfiguration(); 160 Job job = Job.getInstance(conf1); 161 162 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 163 164 Credentials credentials = job.getCredentials(); 165 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 166 assertEquals(1, tokens.size()); 167 168 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 169 Token<AuthenticationTokenIdentifier> tokenForCluster = 170 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 171 assertEquals(userPrincipal + '@' + kdc.getRealm(), 172 tokenForCluster.decodeIdentifier().getUsername()); 173 } 174 } finally { 175 kdc.stop(); 176 } 177 } 178 179 @Test 180 public void testInitCredentialsForCluster3() throws Exception { 181 HBaseTestingUtil util1 = new HBaseTestingUtil(); 182 183 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 184 MiniKdc kdc = util1.setupMiniKdc(keytab); 185 try { 186 String username = UserGroupInformation.getLoginUser().getShortUserName(); 187 String userPrincipal = username + "/localhost"; 188 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 189 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 190 191 try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 192 HBaseTestingUtil util2 = new HBaseTestingUtil(); 193 // Assume util2 is insecure cluster 194 // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at 195 // once 196 197 Configuration conf1 = util1.getConfiguration(); 198 Job job = Job.getInstance(conf1); 199 200 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 201 202 Credentials credentials = job.getCredentials(); 203 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 204 assertTrue(tokens.isEmpty()); 205 } 206 } finally { 207 kdc.stop(); 208 } 209 } 210 211 @Test 212 @SuppressWarnings("unchecked") 213 public void testInitCredentialsForCluster4() throws Exception { 214 HBaseTestingUtil util1 = new HBaseTestingUtil(); 215 // Assume util1 is insecure cluster 216 // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once 217 218 HBaseTestingUtil util2 = new HBaseTestingUtil(); 219 File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath()); 220 MiniKdc kdc = util2.setupMiniKdc(keytab); 221 try { 222 String username = UserGroupInformation.getLoginUser().getShortUserName(); 223 String userPrincipal = username + "/localhost"; 224 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 225 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 226 227 try (Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 228 Configuration conf1 = util1.getConfiguration(); 229 Job job = Job.getInstance(conf1); 230 231 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 232 233 Credentials credentials = job.getCredentials(); 234 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 235 assertEquals(1, tokens.size()); 236 237 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 238 Token<AuthenticationTokenIdentifier> tokenForCluster = 239 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 240 assertEquals(userPrincipal + '@' + kdc.getRealm(), 241 tokenForCluster.decodeIdentifier().getUsername()); 242 } 243 } finally { 244 kdc.stop(); 245 } 246 } 247 248 @Test 249 @SuppressWarnings("unchecked") 250 public void testInitCredentialsForClusterUri() throws Exception { 251 HBaseTestingUtil util1 = new HBaseTestingUtil(); 252 HBaseTestingUtil util2 = new HBaseTestingUtil(); 253 254 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 255 MiniKdc kdc = util1.setupMiniKdc(keytab); 256 try { 257 String username = UserGroupInformation.getLoginUser().getShortUserName(); 258 String userPrincipal = username + "/localhost"; 259 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 260 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 261 262 try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL); 263 Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 264 Configuration conf1 = util1.getConfiguration(); 265 Job job = Job.getInstance(conf1); 266 267 // use Configuration from util1 and URI from util2, to make sure that we use the URI instead 268 // of rely on the Configuration 269 TableMapReduceUtil.initCredentialsForCluster(job, util1.getConfiguration(), 270 new URI(util2.getRpcConnnectionURI())); 271 272 Credentials credentials = job.getCredentials(); 273 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 274 assertEquals(1, tokens.size()); 275 276 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 277 Token<AuthenticationTokenIdentifier> tokenForCluster = 278 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 279 assertEquals(userPrincipal + '@' + kdc.getRealm(), 280 tokenForCluster.decodeIdentifier().getUsername()); 281 } 282 } finally { 283 kdc.stop(); 284 } 285 } 286}