001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.Closeable; 026import java.io.File; 027import java.util.Collection; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 033import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 034import org.apache.hadoop.hbase.security.access.AccessController; 035import org.apache.hadoop.hbase.security.access.PermissionStorage; 036import org.apache.hadoop.hbase.security.access.SecureTestUtil; 037import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders; 038import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; 039import org.apache.hadoop.hbase.security.token.TokenProvider; 040import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; 041import org.apache.hadoop.hbase.testclassification.MapReduceTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 045import org.apache.hadoop.io.LongWritable; 046import org.apache.hadoop.io.Text; 047import org.apache.hadoop.mapreduce.Job; 048import org.apache.hadoop.minikdc.MiniKdc; 049import org.apache.hadoop.security.Credentials; 050import org.apache.hadoop.security.UserGroupInformation; 051import org.apache.hadoop.security.authentication.util.KerberosName; 052import org.apache.hadoop.security.token.Token; 053import org.apache.hadoop.security.token.TokenIdentifier; 054import org.junit.After; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058 059/** 060 * Test different variants of initTableMapperJob method 061 */ 062@Category({ MapReduceTests.class, MediumTests.class }) 063public class TestTableMapReduceUtil { 064 private static final String HTTP_PRINCIPAL = "HTTP/localhost"; 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestTableMapReduceUtil.class); 069 070 @After 071 public void after() { 072 SaslClientAuthenticationProviders.reset(); 073 } 074 075 /* 076 * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because the method 077 * depends on an online cluster. 078 */ 079 080 @Test 081 public void testInitTableMapperJob1() throws Exception { 082 Configuration configuration = new Configuration(); 083 Job job = Job.getInstance(configuration, "tableName"); 084 // test 085 TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class, 086 Text.class, job, false, WALInputFormat.class); 087 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 088 assertEquals(Import.Importer.class, job.getMapperClass()); 089 assertEquals(LongWritable.class, job.getOutputKeyClass()); 090 assertEquals(Text.class, job.getOutputValueClass()); 091 assertNull(job.getCombinerClass()); 092 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 093 } 094 095 @Test 096 public void testInitTableMapperJob2() throws Exception { 097 Configuration configuration = new Configuration(); 098 Job job = Job.getInstance(configuration, "tableName"); 099 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 100 Text.class, Text.class, job, false, WALInputFormat.class); 101 assertEquals(WALInputFormat.class, job.getInputFormatClass()); 102 assertEquals(Import.Importer.class, job.getMapperClass()); 103 assertEquals(LongWritable.class, job.getOutputKeyClass()); 104 assertEquals(Text.class, job.getOutputValueClass()); 105 assertNull(job.getCombinerClass()); 106 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 107 } 108 109 @Test 110 public void testInitTableMapperJob3() throws Exception { 111 Configuration configuration = new Configuration(); 112 Job job = Job.getInstance(configuration, "tableName"); 113 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 114 Text.class, Text.class, job); 115 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 116 assertEquals(Import.Importer.class, job.getMapperClass()); 117 assertEquals(LongWritable.class, job.getOutputKeyClass()); 118 assertEquals(Text.class, job.getOutputValueClass()); 119 assertNull(job.getCombinerClass()); 120 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 121 } 122 123 @Test 124 public void testInitTableMapperJob4() throws Exception { 125 Configuration configuration = new Configuration(); 126 Job job = Job.getInstance(configuration, "tableName"); 127 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), Import.Importer.class, 128 Text.class, Text.class, job, false); 129 assertEquals(TableInputFormat.class, job.getInputFormatClass()); 130 assertEquals(Import.Importer.class, job.getMapperClass()); 131 assertEquals(LongWritable.class, job.getOutputKeyClass()); 132 assertEquals(Text.class, job.getOutputValueClass()); 133 assertNull(job.getCombinerClass()); 134 assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); 135 } 136 137 private static Closeable startSecureMiniCluster(HBaseTestingUtility util, MiniKdc kdc, 138 String principal) throws Exception { 139 Configuration conf = util.getConfiguration(); 140 141 SecureTestUtil.enableSecurity(conf); 142 VisibilityTestUtil.enableVisiblityLabels(conf); 143 SecureTestUtil.verifyConfiguration(conf); 144 145 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 146 AccessController.class.getName() + ',' + TokenProvider.class.getName()); 147 148 HBaseKerberosUtils.setSecuredConfiguration(conf, principal + '@' + kdc.getRealm(), 149 HTTP_PRINCIPAL + '@' + kdc.getRealm()); 150 151 KerberosName.resetDefaultRealm(); 152 153 util.startMiniCluster(); 154 try { 155 util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME); 156 } catch (Exception e) { 157 util.shutdownMiniCluster(); 158 throw e; 159 } 160 161 return util::shutdownMiniCluster; 162 } 163 164 @Test 165 public void testInitCredentialsForCluster1() throws Exception { 166 HBaseTestingUtility util1 = new HBaseTestingUtility(); 167 HBaseTestingUtility util2 = new HBaseTestingUtility(); 168 169 util1.startMiniCluster(); 170 try { 171 util2.startMiniCluster(); 172 try { 173 Configuration conf1 = util1.getConfiguration(); 174 Job job = Job.getInstance(conf1); 175 176 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 177 178 Credentials credentials = job.getCredentials(); 179 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 180 assertTrue(tokens.isEmpty()); 181 } finally { 182 util2.shutdownMiniCluster(); 183 } 184 } finally { 185 util1.shutdownMiniCluster(); 186 } 187 } 188 189 @Test 190 @SuppressWarnings("unchecked") 191 public void testInitCredentialsForCluster2() throws Exception { 192 HBaseTestingUtility util1 = new HBaseTestingUtility(); 193 HBaseTestingUtility util2 = new HBaseTestingUtility(); 194 195 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 196 MiniKdc kdc = util1.setupMiniKdc(keytab); 197 String username = UserGroupInformation.getLoginUser().getShortUserName(); 198 String userPrincipal = username + "/localhost"; 199 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 200 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 201 202 try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal); 203 Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) { 204 try { 205 Configuration conf1 = util1.getConfiguration(); 206 Job job = Job.getInstance(conf1); 207 208 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 209 210 Credentials credentials = job.getCredentials(); 211 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 212 assertEquals(1, tokens.size()); 213 214 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 215 Token<AuthenticationTokenIdentifier> tokenForCluster = 216 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 217 assertEquals(userPrincipal + '@' + kdc.getRealm(), 218 tokenForCluster.decodeIdentifier().getUsername()); 219 } finally { 220 kdc.stop(); 221 } 222 } 223 } 224 225 @Test 226 public void testInitCredentialsForCluster3() throws Exception { 227 HBaseTestingUtility util1 = new HBaseTestingUtility(); 228 229 File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); 230 MiniKdc kdc = util1.setupMiniKdc(keytab); 231 String username = UserGroupInformation.getLoginUser().getShortUserName(); 232 String userPrincipal = username + "/localhost"; 233 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 234 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 235 236 try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal)) { 237 try { 238 HBaseTestingUtility util2 = new HBaseTestingUtility(); 239 // Assume util2 is insecure cluster 240 // Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at 241 // once 242 243 Configuration conf1 = util1.getConfiguration(); 244 Job job = Job.getInstance(conf1); 245 246 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 247 248 Credentials credentials = job.getCredentials(); 249 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 250 assertTrue(tokens.isEmpty()); 251 } finally { 252 kdc.stop(); 253 } 254 } 255 } 256 257 @Test 258 @SuppressWarnings("unchecked") 259 public void testInitCredentialsForCluster4() throws Exception { 260 HBaseTestingUtility util1 = new HBaseTestingUtility(); 261 // Assume util1 is insecure cluster 262 // Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once 263 264 HBaseTestingUtility util2 = new HBaseTestingUtility(); 265 File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath()); 266 MiniKdc kdc = util2.setupMiniKdc(keytab); 267 String username = UserGroupInformation.getLoginUser().getShortUserName(); 268 String userPrincipal = username + "/localhost"; 269 kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); 270 loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); 271 272 try (Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) { 273 try { 274 Configuration conf1 = util1.getConfiguration(); 275 Job job = Job.getInstance(conf1); 276 277 TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration()); 278 279 Credentials credentials = job.getCredentials(); 280 Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens(); 281 assertEquals(1, tokens.size()); 282 283 String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); 284 Token<AuthenticationTokenIdentifier> tokenForCluster = 285 (Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId)); 286 assertEquals(userPrincipal + '@' + kdc.getRealm(), 287 tokenForCluster.decodeIdentifier().getUsername()); 288 } finally { 289 kdc.stop(); 290 } 291 } 292 } 293}