001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.Closeable; 026import java.io.File; 027import java.net.URI; 028import java.util.Collection; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.client.Scan; 033import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 034import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 035import org.apache.hadoop.hbase.security.access.AccessController; 036import org.apache.hadoop.hbase.security.access.PermissionStorage; 037import org.apache.hadoop.hbase.security.access.SecureTestUtil; 038import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders; 039import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; 040import org.apache.hadoop.hbase.security.token.TokenProvider; 041import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 042import org.apache.hadoop.hbase.testclassification.MapReduceTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 046import org.apache.hadoop.io.LongWritable; 047import org.apache.hadoop.io.Text; 048import org.apache.hadoop.mapreduce.Job; 049import org.apache.hadoop.minikdc.MiniKdc; 050import org.apache.hadoop.security.Credentials; 051import org.apache.hadoop.security.UserGroupInformation; 052import org.apache.hadoop.security.token.Token; 053import org.apache.hadoop.security.token.TokenIdentifier; 054import org.junit.After; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058 059/** 060 * Test different variants of initTableMapperJob method 061 */ 062@Category({ MapReduceTests.class, MediumTests.class }) 063public class TestTableMapReduceUtil { 064 private static final String HTTP_PRINCIPAL = "HTTP/localhost"; 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestTableMapReduceUtil.class); 069 070 @After 071 public void after() { 072 SaslClientAuthenticationProviders.reset(); 073 } 074 075 /* 076 * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method 077 * depends on an online cluster. 078 */ 079 080 @Test 081 public void testInitTableMapperJob1() throws Exception { 082 Configuration configuration = new Configuration(); 083 Job job = Job.getInstance(configuration, "tableName"); 084 // test 085 TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class, 086 Text.class, job, false, WALInputFormat.class); 087 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 088 assertEquals(Import.Importer.class, job.getMapperClass()); 089 assertEquals(LongWritable.class, job.getOutputKeyClass()); 090 assertEquals(Text.class, job.getOutputValueClass()); 091 assertNull(job.getCombinerClass()); 092 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 093 } 094 095 @Test 096 public void testInitTableMapperJob2() throws Exception { 097 Configuration configuration = new Configuration(); 098 Job job = Job.getInstance(configuration, "tableName"); 099 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 100 Text.class, Text.class, job, false, WALInputFormat.class); 101 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 102 assertEquals(Import.Importer.class, job.getMapperClass()); 103 assertEquals(LongWritable.class, job.getOutputKeyClass()); 104 assertEquals(Text.class, job.getOutputValueClass()); 105 assertNull(job.getCombinerClass()); 106 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 107 } 108 109 @Test 110 public void testInitTableMapperJob3() throws Exception { 111 Configuration configuration = new Configuration(); 112 Job job = Job.getInstance(configuration, "tableName"); 113 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 114 Text.class, Text.class, job); 115 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 116 assertEquals(Import.Importer.class, job.getMapperClass()); 117 assertEquals(LongWritable.class, job.getOutputKeyClass()); 118 assertEquals(Text.class, job.getOutputValueClass()); 119 assertNull(job.getCombinerClass()); 120 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 121 } 122 123 @Test 124 public void testInitTableMapperJob4() throws Exception { 125 Configuration configuration = new Configuration(); 126 Job job = Job.getInstance(configuration, "tableName"); 127 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 128 Text.class, Text.class, job, false); 129 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 130 assertEquals(Import.Importer.class, job.getMapperClass()); 131 assertEquals(LongWritable.class, job.getOutputKeyClass()); 132 assertEquals(Text.class, job.getOutputValueClass()); 133 assertNull(job.getCombinerClass()); 134 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 135 } 136 137 private static Closeable startSecureMiniCluster(HBaseTestingUtil util, MiniKdc kdc, 138 String principal) throws Exception { 139 Configuration conf = util.getConfiguration(); 140 141 SecureTestUtil.enableSecurity(conf); 142 VisibilityTestUtil.enableVisiblityLabels(conf); 143 SecureTestUtil.verifyConfiguration(conf); 144 145 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 146 AccessController.class.getName() + ',' + TokenProvider.class.getName()); 147 148 HBaseKerberosUtils.setSecuredConfiguration(conf, principal + '@' + kdc.getRealm(), 149 HTTP_PRINCIPAL + '@' + kdc.getRealm()); 150 151 util.startMiniCluster(); 152 try { 153 util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME); 154 } catch (Exception e) { 155 util.shutdownMiniCluster(); 156 throw e; 157 } 158 159 return util::shutdownMiniCluster; 160 } 161 162 @Test 163 public void testInitCredentialsForCluster1() throws Exception { 164 HBaseTestingUtil util1 = new HBaseTestingUtil(); 165 HBaseTestingUtil util2 = new HBaseTestingUtil(); 166 167 util1.startMiniCluster(); 168 try { 169 util2.startMiniCluster(); 170 try { 171 Configuration conf1 = util1.getConfiguration(); 172 Job job = Job.getInstance(conf1); 173 174 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 175 176 Credentials credentials = job.getCredentials(); 177 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 178 assertTrue(tokens.isEmpty()); 179 } finally { 180 util2.shutdownMiniCluster(); 181 } 182 } finally { 183 util1.shutdownMiniCluster(); 184 } 185 } 186 187 @Test 188 @SuppressWarnings("unchecked") 189 public void testInitCredentialsForCluster2() throws Exception { 190 HBaseTestingUtil util1 = new HBaseTestingUtil(); 191 HBaseTestingUtil util2 = new HBaseTestingUtil(); 192 193 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 194 MiniKdc kdc = util1.setupMiniKdc(keytab); 195 try { 196 String username = UserGroupInformation.getLoginUser().getShortUserName(); 197 String userPrincipal = username + "/localhost"; 198 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 199 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 200 201 try (Closeable ignored1 = startSecureMiniCluster(util1, kdc, userPrincipal); 202 Closeable ignored2 = startSecureMiniCluster(util2, kdc, userPrincipal)) { 203 Configuration conf1 = util1.getConfiguration(); 204 Job job = Job.getInstance(conf1); 205 206 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 207 208 Credentials credentials = job.getCredentials(); 209 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 210 assertEquals(1, tokens.size()); 211 212 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 213 Token<AuthenticationTokenIdentifier> tokenForCluster = 214 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 215 assertEquals(userPrincipal + '@' + kdc.getRealm(), 216 tokenForCluster.decodeIdentifier().getUsername()); 217 } 218 } finally { 219 kdc.stop(); 220 } 221 } 222 223 @Test 224 public void testInitCredentialsForCluster3() throws Exception { 225 HBaseTestingUtil util1 = new HBaseTestingUtil(); 226 227 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 228 MiniKdc kdc = util1.setupMiniKdc(keytab); 229 try { 230 String username = UserGroupInformation.getLoginUser().getShortUserName(); 231 String userPrincipal = username + "/localhost"; 232 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 233 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 234 235 try (Closeable ignored1 = startSecureMiniCluster(util1, kdc, userPrincipal)) { 236 HBaseTestingUtil util2 = new HBaseTestingUtil(); 237 // Assume util2 is insecure cluster 238 // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at 239 // once 240 241 Configuration conf1 = util1.getConfiguration(); 242 Job job = Job.getInstance(conf1); 243 244 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 245 246 Credentials credentials = job.getCredentials(); 247 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 248 assertTrue(tokens.isEmpty()); 249 } 250 } finally { 251 kdc.stop(); 252 } 253 } 254 255 @Test 256 @SuppressWarnings("unchecked") 257 public void testInitCredentialsForCluster4() throws Exception { 258 HBaseTestingUtil util1 = new HBaseTestingUtil(); 259 // Assume util1 is insecure cluster 260 // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once 261 262 HBaseTestingUtil util2 = new HBaseTestingUtil(); 263 File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath()); 264 MiniKdc kdc = util2.setupMiniKdc(keytab); 265 try { 266 String username = UserGroupInformation.getLoginUser().getShortUserName(); 267 String userPrincipal = username + "/localhost"; 268 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 269 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 270 271 try (Closeable ignored2 = startSecureMiniCluster(util2, kdc, userPrincipal)) { 272 Configuration conf1 = util1.getConfiguration(); 273 Job job = Job.getInstance(conf1); 274 275 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 276 277 Credentials credentials = job.getCredentials(); 278 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 279 assertEquals(1, tokens.size()); 280 281 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 282 Token<AuthenticationTokenIdentifier> tokenForCluster = 283 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 284 assertEquals(userPrincipal + '@' + kdc.getRealm(), 285 tokenForCluster.decodeIdentifier().getUsername()); 286 } 287 } finally { 288 kdc.stop(); 289 } 290 } 291 292 @Test 293 @SuppressWarnings("unchecked") 294 public void testInitCredentialsForClusterUri() throws Exception { 295 HBaseTestingUtil util1 = new HBaseTestingUtil(); 296 HBaseTestingUtil util2 = new HBaseTestingUtil(); 297 298 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 299 MiniKdc kdc = util1.setupMiniKdc(keytab); 300 try { 301 String username = UserGroupInformation.getLoginUser().getShortUserName(); 302 String userPrincipal = username + "/localhost"; 303 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 304 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 305 306 try (Closeable ignored1 = startSecureMiniCluster(util1, kdc, userPrincipal); 307 Closeable ignored2 = startSecureMiniCluster(util2, kdc, userPrincipal)) { 308 Configuration conf1 = util1.getConfiguration(); 309 Job job = Job.getInstance(conf1); 310 311 // use Configuration from util1 and URI from util2, to make sure that we use the URI instead 312 // of rely on the Configuration 313 TableMapReduceUtil.initCredentialsForCluster(job, util1.getConfiguration(), 314 new URI(util2.getRpcConnnectionURI())); 315 316 Credentials credentials = job.getCredentials(); 317 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 318 assertEquals(1, tokens.size()); 319 320 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 321 Token<AuthenticationTokenIdentifier> tokenForCluster = 322 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 323 assertEquals(userPrincipal + '@' + kdc.getRealm(), 324 tokenForCluster.decodeIdentifier().getUsername()); 325 } 326 } finally { 327 kdc.stop(); 328 } 329 } 330}