001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.Closeable; 026import java.io.File; 027import java.net.URI; 028import java.util.Collection; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.client.Scan; 033import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders; 034import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; 035import org.apache.hadoop.hbase.testclassification.MapReduceTests; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 039import org.apache.hadoop.io.LongWritable; 040import org.apache.hadoop.io.Text; 041import org.apache.hadoop.mapreduce.Job; 042import org.apache.hadoop.minikdc.MiniKdc; 043import org.apache.hadoop.security.Credentials; 044import org.apache.hadoop.security.UserGroupInformation; 045import org.apache.hadoop.security.token.Token; 046import org.apache.hadoop.security.token.TokenIdentifier; 047import org.junit.After; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051 052/** 053 * Test different variants of initTableMapperJob method 054 */ 055@Category({ MapReduceTests.class, MediumTests.class }) 056public class TestTableMapReduceUtil { 057 private static final String HTTP_PRINCIPAL = "HTTP/localhost"; 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestTableMapReduceUtil.class); 062 063 @After 064 public void after() { 065 SaslClientAuthenticationProviders.reset(); 066 } 067 068 /* 069 * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method 070 * depends on an online cluster. 071 */ 072 073 @Test 074 public void testInitTableMapperJob1() throws Exception { 075 Configuration configuration = new Configuration(); 076 Job job = Job.getInstance(configuration, "tableName"); 077 // test 078 TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class, 079 Text.class, job, false, WALInputFormat.class); 080 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 081 assertEquals(Import.Importer.class, job.getMapperClass()); 082 assertEquals(LongWritable.class, job.getOutputKeyClass()); 083 assertEquals(Text.class, job.getOutputValueClass()); 084 assertNull(job.getCombinerClass()); 085 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 086 } 087 088 @Test 089 public void testInitTableMapperJob2() throws Exception { 090 Configuration configuration = new Configuration(); 091 Job job = Job.getInstance(configuration, "tableName"); 092 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 093 Text.class, Text.class, job, false, WALInputFormat.class); 094 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 095 assertEquals(Import.Importer.class, job.getMapperClass()); 096 assertEquals(LongWritable.class, job.getOutputKeyClass()); 097 assertEquals(Text.class, job.getOutputValueClass()); 098 assertNull(job.getCombinerClass()); 099 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 100 } 101 102 @Test 103 public void testInitTableMapperJob3() throws Exception { 104 Configuration configuration = new Configuration(); 105 Job job = Job.getInstance(configuration, "tableName"); 106 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 107 Text.class, Text.class, job); 108 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 109 assertEquals(Import.Importer.class, job.getMapperClass()); 110 assertEquals(LongWritable.class, job.getOutputKeyClass()); 111 assertEquals(Text.class, job.getOutputValueClass()); 112 assertNull(job.getCombinerClass()); 113 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 114 } 115 116 @Test 117 public void testInitTableMapperJob4() throws Exception { 118 Configuration configuration = new Configuration(); 119 Job job = Job.getInstance(configuration, "tableName"); 120 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 121 Text.class, Text.class, job, false); 122 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 123 assertEquals(Import.Importer.class, job.getMapperClass()); 124 assertEquals(LongWritable.class, job.getOutputKeyClass()); 125 assertEquals(Text.class, job.getOutputValueClass()); 126 assertNull(job.getCombinerClass()); 127 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 128 } 129 130 @Test 131 public void testInitCredentialsForCluster1() throws Exception { 132 HBaseTestingUtil util1 = new HBaseTestingUtil(); 133 HBaseTestingUtil util2 = new HBaseTestingUtil(); 134 135 util1.startMiniCluster(); 136 try { 137 util2.startMiniCluster(); 138 try { 139 Configuration conf1 = util1.getConfiguration(); 140 Job job = Job.getInstance(conf1); 141 142 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 143 144 Credentials credentials = job.getCredentials(); 145 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 146 assertTrue(tokens.isEmpty()); 147 } finally { 148 util2.shutdownMiniCluster(); 149 } 150 } finally { 151 util1.shutdownMiniCluster(); 152 } 153 } 154 155 @Test 156 @SuppressWarnings("unchecked") 157 public void testInitCredentialsForCluster2() throws Exception { 158 HBaseTestingUtil util1 = new HBaseTestingUtil(); 159 HBaseTestingUtil util2 = new HBaseTestingUtil(); 160 161 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 162 MiniKdc kdc = util1.setupMiniKdc(keytab); 163 try { 164 String username = UserGroupInformation.getLoginUser().getShortUserName(); 165 String userPrincipal = username + "/localhost"; 166 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 167 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 168 169 try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL); 170 Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 171 Configuration conf1 = util1.getConfiguration(); 172 Job job = Job.getInstance(conf1); 173 174 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 175 176 Credentials credentials = job.getCredentials(); 177 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 178 assertEquals(1, tokens.size()); 179 180 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 181 Token<AuthenticationTokenIdentifier> tokenForCluster = 182 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 183 assertEquals(userPrincipal + '@' + kdc.getRealm(), 184 tokenForCluster.decodeIdentifier().getUsername()); 185 } 186 } finally { 187 kdc.stop(); 188 } 189 } 190 191 @Test 192 public void testInitCredentialsForCluster3() throws Exception { 193 HBaseTestingUtil util1 = new HBaseTestingUtil(); 194 195 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 196 MiniKdc kdc = util1.setupMiniKdc(keytab); 197 try { 198 String username = UserGroupInformation.getLoginUser().getShortUserName(); 199 String userPrincipal = username + "/localhost"; 200 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 201 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 202 203 try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 204 HBaseTestingUtil util2 = new HBaseTestingUtil(); 205 // Assume util2 is insecure cluster 206 // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at 207 // once 208 209 Configuration conf1 = util1.getConfiguration(); 210 Job job = Job.getInstance(conf1); 211 212 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 213 214 Credentials credentials = job.getCredentials(); 215 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 216 assertTrue(tokens.isEmpty()); 217 } 218 } finally { 219 kdc.stop(); 220 } 221 } 222 223 @Test 224 @SuppressWarnings("unchecked") 225 public void testInitCredentialsForCluster4() throws Exception { 226 HBaseTestingUtil util1 = new HBaseTestingUtil(); 227 // Assume util1 is insecure cluster 228 // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once 229 230 HBaseTestingUtil util2 = new HBaseTestingUtil(); 231 File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath()); 232 MiniKdc kdc = util2.setupMiniKdc(keytab); 233 try { 234 String username = UserGroupInformation.getLoginUser().getShortUserName(); 235 String userPrincipal = username + "/localhost"; 236 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 237 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 238 239 try (Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 240 Configuration conf1 = util1.getConfiguration(); 241 Job job = Job.getInstance(conf1); 242 243 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 244 245 Credentials credentials = job.getCredentials(); 246 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 247 assertEquals(1, tokens.size()); 248 249 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 250 Token<AuthenticationTokenIdentifier> tokenForCluster = 251 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 252 assertEquals(userPrincipal + '@' + kdc.getRealm(), 253 tokenForCluster.decodeIdentifier().getUsername()); 254 } 255 } finally { 256 kdc.stop(); 257 } 258 } 259 260 @Test 261 @SuppressWarnings("unchecked") 262 public void testInitCredentialsForClusterUri() throws Exception { 263 HBaseTestingUtil util1 = new HBaseTestingUtil(); 264 HBaseTestingUtil util2 = new HBaseTestingUtil(); 265 266 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 267 MiniKdc kdc = util1.setupMiniKdc(keytab); 268 try { 269 String username = UserGroupInformation.getLoginUser().getShortUserName(); 270 String userPrincipal = username + "/localhost"; 271 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 272 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 273 274 try (Closeable ignored1 = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL); 275 Closeable ignored2 = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) { 276 Configuration conf1 = util1.getConfiguration(); 277 Job job = Job.getInstance(conf1); 278 279 // use Configuration from util1 and URI from util2, to make sure that we use the URI instead 280 // of rely on the Configuration 281 TableMapReduceUtil.initCredentialsForCluster(job, util1.getConfiguration(), 282 new URI(util2.getRpcConnnectionURI())); 283 284 Credentials credentials = job.getCredentials(); 285 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 286 assertEquals(1, tokens.size()); 287 288 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 289 Token<AuthenticationTokenIdentifier> tokenForCluster = 290 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 291 assertEquals(userPrincipal + '@' + kdc.getRealm(), 292 tokenForCluster.decodeIdentifier().getUsername()); 293 } 294 } finally { 295 kdc.stop(); 296 } 297 } 298}