001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.Closeable; 026import java.io.File; 027import java.util.Collection; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 033import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 034import org.apache.hadoop.hbase.security.access.AccessController; 035import org.apache.hadoop.hbase.security.access.PermissionStorage; 036import org.apache.hadoop.hbase.security.access.SecureTestUtil; 037import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders; 038import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; 039import org.apache.hadoop.hbase.security.token.TokenProvider; 040import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 041import org.apache.hadoop.hbase.testclassification.MapReduceTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 045import org.apache.hadoop.io.LongWritable; 046import org.apache.hadoop.io.Text; 047import org.apache.hadoop.mapreduce.Job; 048import org.apache.hadoop.minikdc.MiniKdc; 049import org.apache.hadoop.security.Credentials; 050import org.apache.hadoop.security.UserGroupInformation; 051import org.apache.hadoop.security.token.Token; 052import org.apache.hadoop.security.token.TokenIdentifier; 053import org.junit.After; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057 058/** 059 * Test different variants of initTableMapperJob method 060 */ 061@Category({ MapReduceTests.class, MediumTests.class }) 062public class TestTableMapReduceUtil { 063 private static final String HTTP_PRINCIPAL = "HTTP/localhost"; 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestTableMapReduceUtil.class); 068 069 @After 070 public void after() { 071 SaslClientAuthenticationProviders.reset(); 072 } 073 074 /* 075 * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method 076 * depends on an online cluster. 077 */ 078 079 @Test 080 public void testInitTableMapperJob1() throws Exception { 081 Configuration configuration = new Configuration(); 082 Job job = Job.getInstance(configuration, "tableName"); 083 // test 084 TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class, 085 Text.class, job, false, WALInputFormat.class); 086 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 087 assertEquals(Import.Importer.class, job.getMapperClass()); 088 assertEquals(LongWritable.class, job.getOutputKeyClass()); 089 assertEquals(Text.class, job.getOutputValueClass()); 090 assertNull(job.getCombinerClass()); 091 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 092 } 093 094 @Test 095 public void testInitTableMapperJob2() throws Exception { 096 Configuration configuration = new Configuration(); 097 Job job = Job.getInstance(configuration, "tableName"); 098 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 099 Text.class, Text.class, job, false, WALInputFormat.class); 100 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 101 assertEquals(Import.Importer.class, job.getMapperClass()); 102 assertEquals(LongWritable.class, job.getOutputKeyClass()); 103 assertEquals(Text.class, job.getOutputValueClass()); 104 assertNull(job.getCombinerClass()); 105 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 106 } 107 108 @Test 109 public void testInitTableMapperJob3() throws Exception { 110 Configuration configuration = new Configuration(); 111 Job job = Job.getInstance(configuration, "tableName"); 112 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 113 Text.class, Text.class, job); 114 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 115 assertEquals(Import.Importer.class, job.getMapperClass()); 116 assertEquals(LongWritable.class, job.getOutputKeyClass()); 117 assertEquals(Text.class, job.getOutputValueClass()); 118 assertNull(job.getCombinerClass()); 119 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 120 } 121 122 @Test 123 public void testInitTableMapperJob4() throws Exception { 124 Configuration configuration = new Configuration(); 125 Job job = Job.getInstance(configuration, "tableName"); 126 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 127 Text.class, Text.class, job, false); 128 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 129 assertEquals(Import.Importer.class, job.getMapperClass()); 130 assertEquals(LongWritable.class, job.getOutputKeyClass()); 131 assertEquals(Text.class, job.getOutputValueClass()); 132 assertNull(job.getCombinerClass()); 133 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 134 } 135 136 private static Closeable startSecureMiniCluster(HBaseTestingUtil util, MiniKdc kdc, 137 String principal) throws Exception { 138 Configuration conf = util.getConfiguration(); 139 140 SecureTestUtil.enableSecurity(conf); 141 VisibilityTestUtil.enableVisiblityLabels(conf); 142 SecureTestUtil.verifyConfiguration(conf); 143 144 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 145 AccessController.class.getName() + ',' + TokenProvider.class.getName()); 146 147 HBaseKerberosUtils.setSecuredConfiguration(conf, principal + '@' + kdc.getRealm(), 148 HTTP_PRINCIPAL + '@' + kdc.getRealm()); 149 150 util.startMiniCluster(); 151 try { 152 util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME); 153 } catch (Exception e) { 154 util.shutdownMiniCluster(); 155 throw e; 156 } 157 158 return util::shutdownMiniCluster; 159 } 160 161 @Test 162 public void testInitCredentialsForCluster1() throws Exception { 163 HBaseTestingUtil util1 = new HBaseTestingUtil(); 164 HBaseTestingUtil util2 = new HBaseTestingUtil(); 165 166 util1.startMiniCluster(); 167 try { 168 util2.startMiniCluster(); 169 try { 170 Configuration conf1 = util1.getConfiguration(); 171 Job job = Job.getInstance(conf1); 172 173 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 174 175 Credentials credentials = job.getCredentials(); 176 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 177 assertTrue(tokens.isEmpty()); 178 } finally { 179 util2.shutdownMiniCluster(); 180 } 181 } finally { 182 util1.shutdownMiniCluster(); 183 } 184 } 185 186 @Test 187 @SuppressWarnings("unchecked") 188 public void testInitCredentialsForCluster2() throws Exception { 189 HBaseTestingUtil util1 = new HBaseTestingUtil(); 190 HBaseTestingUtil util2 = new HBaseTestingUtil(); 191 192 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 193 MiniKdc kdc = util1.setupMiniKdc(keytab); 194 try { 195 String username = UserGroupInformation.getLoginUser().getShortUserName(); 196 String userPrincipal = username + "/localhost"; 197 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 198 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 199 200 try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal); 201 Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) { 202 Configuration conf1 = util1.getConfiguration(); 203 Job job = Job.getInstance(conf1); 204 205 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 206 207 Credentials credentials = job.getCredentials(); 208 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 209 assertEquals(1, tokens.size()); 210 211 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 212 Token<AuthenticationTokenIdentifier> tokenForCluster = 213 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 214 assertEquals(userPrincipal + '@' + kdc.getRealm(), 215 tokenForCluster.decodeIdentifier().getUsername()); 216 } 217 } finally { 218 kdc.stop(); 219 } 220 } 221 222 @Test 223 public void testInitCredentialsForCluster3() throws Exception { 224 HBaseTestingUtil util1 = new HBaseTestingUtil(); 225 226 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 227 MiniKdc kdc = util1.setupMiniKdc(keytab); 228 try { 229 String username = UserGroupInformation.getLoginUser().getShortUserName(); 230 String userPrincipal = username + "/localhost"; 231 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 232 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 233 234 try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal)) { 235 HBaseTestingUtil util2 = new HBaseTestingUtil(); 236 // Assume util2 is insecure cluster 237 // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at 238 // once 239 240 Configuration conf1 = util1.getConfiguration(); 241 Job job = Job.getInstance(conf1); 242 243 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 244 245 Credentials credentials = job.getCredentials(); 246 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 247 assertTrue(tokens.isEmpty()); 248 } 249 } finally { 250 kdc.stop(); 251 } 252 } 253 254 @Test 255 @SuppressWarnings("unchecked") 256 public void testInitCredentialsForCluster4() throws Exception { 257 HBaseTestingUtil util1 = new HBaseTestingUtil(); 258 // Assume util1 is insecure cluster 259 // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once 260 261 HBaseTestingUtil util2 = new HBaseTestingUtil(); 262 File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath()); 263 MiniKdc kdc = util2.setupMiniKdc(keytab); 264 try { 265 String username = UserGroupInformation.getLoginUser().getShortUserName(); 266 String userPrincipal = username + "/localhost"; 267 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 268 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 269 270 try (Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) { 271 Configuration conf1 = util1.getConfiguration(); 272 Job job = Job.getInstance(conf1); 273 274 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 275 276 Credentials credentials = job.getCredentials(); 277 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 278 assertEquals(1, tokens.size()); 279 280 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 281 Token<AuthenticationTokenIdentifier> tokenForCluster = 282 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 283 assertEquals(userPrincipal + '@' + kdc.getRealm(), 284 tokenForCluster.decodeIdentifier().getUsername()); 285 } 286 } finally { 287 kdc.stop(); 288 } 289 } 290}