001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.Closeable; 026import java.io.File; 027import java.net.URI; 028import java.util.Collection; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.client.Scan; 033import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; 034import org.apache.hadoop.hbase.testclassification.MapReduceTests; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 038import org.apache.hadoop.io.LongWritable; 039import org.apache.hadoop.io.Text; 040import org.apache.hadoop.mapreduce.Job; 041import org.apache.hadoop.minikdc.MiniKdc; 042import org.apache.hadoop.security.Credentials; 043import org.apache.hadoop.security.UserGroupInformation; 044import org.apache.hadoop.security.token.Token; 045import org.apache.hadoop.security.token.TokenIdentifier; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050/** 051 * Test different variants of initTableMapperJob method 052 */ 053@Category({ MapReduceTests.class, MediumTests.class }) 054public class TestTableMapReduceUtil { 055 private static final String HTTP_PRINCIPAL = "HTTP/localhost"; 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestTableMapReduceUtil.class); 060 061 /* 062 * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method 063 * depends on an online cluster. 064 */ 065 066 @Test 067 public void testInitTableMapperJob1() throws Exception { 068 Configuration configuration = new Configuration(); 069 Job job = Job.getInstance(configuration, "tableName"); 070 // test 071 TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class, 072 Text.class, job, false, WALInputFormat.class); 073 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 074 assertEquals(Import.Importer.class, job.getMapperClass()); 075 assertEquals(LongWritable.class, job.getOutputKeyClass()); 076 assertEquals(Text.class, job.getOutputValueClass()); 077 assertNull(job.getCombinerClass()); 078 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 079 } 080 081 @Test 082 public void testInitTableMapperJob2() throws Exception { 083 Configuration configuration = new Configuration(); 084 Job job = Job.getInstance(configuration, "tableName"); 085 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 086 Text.class, Text.class, job, false, WALInputFormat.class); 087 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 088 assertEquals(Import.Importer.class, job.getMapperClass()); 089 assertEquals(LongWritable.class, job.getOutputKeyClass()); 090 assertEquals(Text.class, job.getOutputValueClass()); 091 assertNull(job.getCombinerClass()); 092 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 093 } 094 095 @Test 096 public void testInitTableMapperJob3() throws Exception { 097 Configuration configuration = new Configuration(); 098 Job job = Job.getInstance(configuration, "tableName"); 099 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 100 Text.class, Text.class, job); 101 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 102 assertEquals(Import.Importer.class, job.getMapperClass()); 103 assertEquals(LongWritable.class, job.getOutputKeyClass()); 104 assertEquals(Text.class, job.getOutputValueClass()); 105 assertNull(job.getCombinerClass()); 106 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 107 } 108 109 @Test 110 public void testInitTableMapperJob4() throws Exception { 111 Configuration configuration = new Configuration(); 112 Job job = Job.getInstance(configuration, "tableName"); 113 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 114 Text.class, Text.class, job, false); 115 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 116 assertEquals(Import.Importer.class, job.getMapperClass()); 117 assertEquals(LongWritable.class, job.getOutputKeyClass()); 118 assertEquals(Text.class, job.getOutputValueClass()); 119 assertNull(job.getCombinerClass()); 120 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 121 } 122 123 @Test 124 public void testInitCredentialsForCluster1() throws Exception { 125 HBaseTestingUtil util1 = new HBaseTestingUtil(); 126 HBaseTestingUtil util2 = new HBaseTestingUtil(); 127 128 util1.startMiniCluster(); 129 try { 130 util2.startMiniCluster(); 131 try { 132 Configuration conf1 = util1.getConfiguration(); 133 Job job = Job.getInstance(conf1); 134 135 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 136 137 Credentials credentials = job.getCredentials(); 138 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 139 assertTrue(tokens.isEmpty()); 140 } finally { 141 util2.shutdownMiniCluster(); 142 } 143 } finally { 144 util1.shutdownMiniCluster(); 145 } 146 } 147 148 @Test 149 @SuppressWarnings("unchecked") 150 public void testInitCredentialsForCluster2() throws Exception { 151 HBaseTestingUtil util1 = new HBaseTestingUtil(); 152 HBaseTestingUtil util2 = new HBaseTestingUtil(); 153 154 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 155 MiniKdc kdc = util1.setupMiniKdc(keytab); 156 try { 157 String username = UserGroupInformation.getLoginUser().getShortUserName(); 158 String userPrincipal = username + "/localhost"; 159 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 160 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 161 162 try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL); 163 Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 164 Configuration conf1 = util1.getConfiguration(); 165 Job job = Job.getInstance(conf1); 166 167 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 168 169 Credentials credentials = job.getCredentials(); 170 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 171 assertEquals(1, tokens.size()); 172 173 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 174 Token<AuthenticationTokenIdentifier> tokenForCluster = 175 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 176 assertEquals(userPrincipal + '@' + kdc.getRealm(), 177 tokenForCluster.decodeIdentifier().getUsername()); 178 } 179 } finally { 180 kdc.stop(); 181 } 182 } 183 184 @Test 185 public void testInitCredentialsForCluster3() throws Exception { 186 HBaseTestingUtil util1 = new HBaseTestingUtil(); 187 188 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 189 MiniKdc kdc = util1.setupMiniKdc(keytab); 190 try { 191 String username = UserGroupInformation.getLoginUser().getShortUserName(); 192 String userPrincipal = username + "/localhost"; 193 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 194 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 195 196 try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 197 HBaseTestingUtil util2 = new HBaseTestingUtil(); 198 // Assume util2 is insecure cluster 199 // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at 200 // once 201 202 Configuration conf1 = util1.getConfiguration(); 203 Job job = Job.getInstance(conf1); 204 205 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 206 207 Credentials credentials = job.getCredentials(); 208 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 209 assertTrue(tokens.isEmpty()); 210 } 211 } finally { 212 kdc.stop(); 213 } 214 } 215 216 @Test 217 @SuppressWarnings("unchecked") 218 public void testInitCredentialsForCluster4() throws Exception { 219 HBaseTestingUtil util1 = new HBaseTestingUtil(); 220 // Assume util1 is insecure cluster 221 // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once 222 223 HBaseTestingUtil util2 = new HBaseTestingUtil(); 224 File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath()); 225 MiniKdc kdc = util2.setupMiniKdc(keytab); 226 try { 227 String username = UserGroupInformation.getLoginUser().getShortUserName(); 228 String userPrincipal = username + "/localhost"; 229 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 230 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 231 232 try (Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 233 Configuration conf1 = util1.getConfiguration(); 234 Job job = Job.getInstance(conf1); 235 236 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 237 238 Credentials credentials = job.getCredentials(); 239 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 240 assertEquals(1, tokens.size()); 241 242 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 243 Token<AuthenticationTokenIdentifier> tokenForCluster = 244 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 245 assertEquals(userPrincipal + '@' + kdc.getRealm(), 246 tokenForCluster.decodeIdentifier().getUsername()); 247 } 248 } finally { 249 kdc.stop(); 250 } 251 } 252 253 @Test 254 @SuppressWarnings("unchecked") 255 public void testInitCredentialsForClusterUri() throws Exception { 256 HBaseTestingUtil util1 = new HBaseTestingUtil(); 257 HBaseTestingUtil util2 = new HBaseTestingUtil(); 258 259 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 260 MiniKdc kdc = util1.setupMiniKdc(keytab); 261 try { 262 String username = UserGroupInformation.getLoginUser().getShortUserName(); 263 String userPrincipal = username + "/localhost"; 264 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 265 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 266 267 try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL); 268 Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 269 Configuration conf1 = util1.getConfiguration(); 270 Job job = Job.getInstance(conf1); 271 272 // use Configuration from util1 and URI from util2, to make sure that we use the URI instead 273 // of rely on the Configuration 274 TableMapReduceUtil.initCredentialsForCluster(job, util1.getConfiguration(), 275 new URI(util2.getRpcConnnectionURI())); 276 277 Credentials credentials = job.getCredentials(); 278 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 279 assertEquals(1, tokens.size()); 280 281 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 282 Token<AuthenticationTokenIdentifier> tokenForCluster = 283 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 284 assertEquals(userPrincipal + '@' + kdc.getRealm(), 285 tokenForCluster.decodeIdentifier().getUsername()); 286 } 287 } finally { 288 kdc.stop(); 289 } 290 } 291}