001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security.token; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.net.InetSocketAddress; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Collection; 033import java.util.List; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.hbase.ChoreService; 037import org.apache.hadoop.hbase.ClusterId; 038import org.apache.hadoop.hbase.CoordinatedStateManager; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.Server; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.client.AsyncClusterConnection; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.ConnectionFactory; 047import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 049import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; 050import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; 051import org.apache.hadoop.hbase.ipc.NettyRpcServer; 052import org.apache.hadoop.hbase.ipc.RpcServer; 053import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 054import org.apache.hadoop.hbase.ipc.RpcServerFactory; 055import org.apache.hadoop.hbase.ipc.RpcServerInterface; 056import org.apache.hadoop.hbase.ipc.ServerRpcController; 057import org.apache.hadoop.hbase.ipc.SimpleRpcServer; 058import org.apache.hadoop.hbase.log.HBaseMarkers; 059import org.apache.hadoop.hbase.regionserver.RegionServerServices; 060import org.apache.hadoop.hbase.security.SecurityInfo; 061import org.apache.hadoop.hbase.security.User; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.testclassification.SecurityTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.hbase.util.Sleeper; 067import org.apache.hadoop.hbase.util.Strings; 068import org.apache.hadoop.hbase.util.Threads; 069import org.apache.hadoop.hbase.util.Writables; 070import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 071import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 072import org.apache.hadoop.net.DNS; 073import org.apache.hadoop.security.authorize.PolicyProvider; 074import org.apache.hadoop.security.authorize.Service; 075import org.apache.hadoop.security.token.SecretManager; 076import org.apache.hadoop.security.token.Token; 077import org.apache.hadoop.security.token.TokenIdentifier; 078import org.junit.After; 079import org.junit.Before; 080import org.junit.ClassRule; 081import org.junit.Test; 082import org.junit.experimental.categories.Category; 083import org.junit.runner.RunWith; 084import org.junit.runners.Parameterized; 085import org.junit.runners.Parameterized.Parameter; 086import org.junit.runners.Parameterized.Parameters; 087import org.mockito.Mockito; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090 091import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 092import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 093import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor; 094import org.apache.hbase.thirdparty.com.google.protobuf.Message; 095import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 096import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 097 098import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos; 099 100/** 101 * Tests for authentication token creation and usage 102 */ 103// This test does a fancy trick where it uses RpcServer and plugs in the Token Service for RpcServer 104// to offer up. It worked find pre-hbase-2.0.0 but post the shading project, it fails because 105// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded 106// protobufs. Since hbase-2.0.0, we added convertion from shaded to non-shaded so this test keeps 107// working. 108@RunWith(Parameterized.class) 109@Category({ SecurityTests.class, MediumTests.class }) 110public class TestTokenAuthentication { 111 112 @ClassRule 113 public static final HBaseClassTestRule CLASS_RULE = 114 HBaseClassTestRule.forClass(TestTokenAuthentication.class); 115 116 static { 117 // Setting whatever system properties after recommendation from 118 // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html 119 System.setProperty("java.security.krb5.realm", "hbase"); 120 System.setProperty("java.security.krb5.kdc", "blah"); 121 } 122 123 /** 124 * Basic server process for RPC authentication testing 125 */ 126 private static class TokenServer extends TokenProvider 127 implements AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server { 128 private static final Logger LOG = LoggerFactory.getLogger(TokenServer.class); 129 private Configuration conf; 130 private HBaseTestingUtil TEST_UTIL; 131 private RpcServerInterface rpcServer; 132 private InetSocketAddress isa; 133 private ZKWatcher zookeeper; 134 private Sleeper sleeper; 135 private boolean started = false; 136 private boolean aborted = false; 137 private boolean stopped = false; 138 private long startcode; 139 140 public TokenServer(Configuration conf, HBaseTestingUtil TEST_UTIL) throws IOException { 141 this.conf = conf; 142 this.TEST_UTIL = TEST_UTIL; 143 this.startcode = EnvironmentEdgeManager.currentTime(); 144 // Server to handle client requests. 145 String hostname = 146 Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default")); 147 int port = 0; 148 // Creation of an ISA will force a resolve. 149 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); 150 if (initialIsa.getAddress() == null) { 151 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 152 } 153 final List<BlockingServiceAndInterface> sai = new ArrayList<>(1); 154 // Make a proxy to go between the shaded Service that rpc expects and the 155 // non-shaded Service this CPEP is providing. This is because this test does a neat 156 // little trick of testing the CPEP Service by inserting it as RpcServer Service. This 157 // worked fine before we shaded PB. Now we need these proxies. 158 final BlockingService service = 159 AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this); 160 final BlockingService proxy = new BlockingService() { 161 @Override 162 public Message callBlockingMethod(MethodDescriptor md, RpcController controller, 163 Message param) throws ServiceException { 164 MethodDescriptor methodDescriptor = 165 service.getDescriptorForType().findMethodByName(md.getName()); 166 Message request = service.getRequestPrototype(methodDescriptor); 167 // TODO: Convert rpcController 168 Message response = null; 169 try { 170 response = service.callBlockingMethod(methodDescriptor, null, request); 171 } catch (ServiceException e) { 172 throw new org.apache.hbase.thirdparty.com.google.protobuf.ServiceException(e); 173 } 174 return null;// Convert 'response'. 175 } 176 177 @Override 178 public ServiceDescriptor getDescriptorForType() { 179 return null; 180 } 181 182 @Override 183 public Message getRequestPrototype(MethodDescriptor arg0) { 184 // TODO Auto-generated method stub 185 return null; 186 } 187 188 @Override 189 public Message getResponsePrototype(MethodDescriptor arg0) { 190 // TODO Auto-generated method stub 191 return null; 192 } 193 }; 194 sai.add(new BlockingServiceAndInterface(proxy, 195 AuthenticationProtos.AuthenticationService.BlockingInterface.class)); 196 this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, initialIsa, conf, 197 new FifoRpcScheduler(conf, 1)); 198 InetSocketAddress address = rpcServer.getListenerAddress(); 199 if (address == null) { 200 throw new IOException("Listener channel is closed"); 201 } 202 this.isa = address; 203 this.sleeper = new Sleeper(1000, this); 204 } 205 206 @Override 207 public Configuration getConfiguration() { 208 return conf; 209 } 210 211 @Override 212 public Connection getConnection() { 213 return null; 214 } 215 216 @Override 217 public ZKWatcher getZooKeeper() { 218 return zookeeper; 219 } 220 221 @Override 222 public CoordinatedStateManager getCoordinatedStateManager() { 223 return null; 224 } 225 226 @Override 227 public boolean isAborted() { 228 return aborted; 229 } 230 231 @Override 232 public ServerName getServerName() { 233 return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode); 234 } 235 236 @Override 237 public FileSystem getFileSystem() { 238 return null; 239 } 240 241 @Override 242 public boolean isStopping() { 243 return this.stopped; 244 } 245 246 @Override 247 public void abort(String reason, Throwable error) { 248 LOG.error(HBaseMarkers.FATAL, "Aborting on: " + reason, error); 249 this.aborted = true; 250 this.stopped = true; 251 sleeper.skipSleepCycle(); 252 } 253 254 private void initialize() throws IOException { 255 // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth 256 Configuration zkConf = new Configuration(conf); 257 zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); 258 this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(), this, true); 259 this.rpcServer.start(); 260 261 // Mock up region coprocessor environment 262 RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class, 263 Mockito.withSettings().extraInterfaces(HasRegionServerServices.class)); 264 when(mockRegionCpEnv.getConfiguration()).thenReturn(conf); 265 when(mockRegionCpEnv.getClassLoader()) 266 .then((var1) -> Thread.currentThread().getContextClassLoader()); 267 RegionServerServices mockRss = mock(RegionServerServices.class); 268 when(mockRss.getRpcServer()).thenReturn(rpcServer); 269 when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices()) 270 .thenReturn(mockRss); 271 272 super.start(mockRegionCpEnv); 273 started = true; 274 } 275 276 @Override 277 public void run() { 278 try { 279 initialize(); 280 while (!stopped) { 281 this.sleeper.sleep(); 282 } 283 } catch (Exception e) { 284 abort(e.getMessage(), e); 285 } 286 this.rpcServer.stop(); 287 } 288 289 public boolean isStarted() { 290 return started; 291 } 292 293 @Override 294 public void stop(String reason) { 295 LOG.info("Stopping due to: " + reason); 296 this.stopped = true; 297 sleeper.skipSleepCycle(); 298 } 299 300 @Override 301 public boolean isStopped() { 302 return stopped; 303 } 304 305 public InetSocketAddress getAddress() { 306 return isa; 307 } 308 309 public SecretManager<? extends TokenIdentifier> getSecretManager() { 310 return ((RpcServer) rpcServer).getSecretManager(); 311 } 312 313 @Override 314 public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken( 315 RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request) 316 throws ServiceException { 317 LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null)); 318 // Ignore above passed in controller -- it is always null 319 ServerRpcController serverController = new ServerRpcController(); 320 final BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> callback = 321 new BlockingRpcCallback<>(); 322 getAuthenticationToken(null, request, callback); 323 try { 324 serverController.checkFailed(); 325 return callback.get(); 326 } catch (IOException ioe) { 327 throw new ServiceException(ioe); 328 } 329 } 330 331 @Override 332 public AuthenticationProtos.WhoAmIResponse whoAmI(RpcController controller, 333 AuthenticationProtos.WhoAmIRequest request) throws ServiceException { 334 LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null)); 335 // Ignore above passed in controller -- it is always null 336 ServerRpcController serverController = new ServerRpcController(); 337 BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback = 338 new BlockingRpcCallback<>(); 339 whoAmI(null, request, callback); 340 try { 341 serverController.checkFailed(); 342 return callback.get(); 343 } catch (IOException ioe) { 344 throw new ServiceException(ioe); 345 } 346 } 347 348 @Override 349 public ChoreService getChoreService() { 350 return null; 351 } 352 353 @Override 354 public Connection createConnection(Configuration conf) throws IOException { 355 return null; 356 } 357 358 @Override 359 public AsyncClusterConnection getAsyncClusterConnection() { 360 return null; 361 } 362 } 363 364 @Parameters(name = "{index}: rpcServerImpl={0}") 365 public static Collection<Object[]> parameters() { 366 return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() }, 367 new Object[] { NettyRpcServer.class.getName() }); 368 } 369 370 @Parameter(0) 371 public String rpcServerImpl; 372 373 private HBaseTestingUtil TEST_UTIL; 374 private TokenServer server; 375 private Thread serverThread; 376 private AuthenticationTokenSecretManager secretManager; 377 private ClusterId clusterId = new ClusterId(); 378 379 @Before 380 public void setUp() throws Exception { 381 TEST_UTIL = new HBaseTestingUtil(); 382 // Override the connection registry to avoid spinning up a mini cluster for the connection below 383 // to go through. 384 TEST_UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 385 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 386 TEST_UTIL.startMiniZKCluster(); 387 // register token type for protocol 388 SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(), 389 new SecurityInfo("hbase.test.kerberos.principal", 390 AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN)); 391 // security settings only added after startup so that ZK does not require SASL 392 Configuration conf = TEST_UTIL.getConfiguration(); 393 conf.set("hadoop.security.authentication", "kerberos"); 394 conf.set("hbase.security.authentication", "kerberos"); 395 conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true); 396 conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl); 397 server = new TokenServer(conf, TEST_UTIL); 398 serverThread = new Thread(server); 399 Threads.setDaemonThreadRunning(serverThread, 400 "TokenServer:" + server.getServerName().toString()); 401 // wait for startup 402 while (!server.isStarted() && !server.isStopped()) { 403 Thread.sleep(10); 404 } 405 server.rpcServer.refreshAuthManager(conf, new PolicyProvider() { 406 @Override 407 public Service[] getServices() { 408 return new Service[] { new Service("security.client.protocol.acl", 409 AuthenticationProtos.AuthenticationService.BlockingInterface.class) }; 410 } 411 }); 412 ZKClusterId.setClusterId(server.getZooKeeper(), clusterId); 413 secretManager = (AuthenticationTokenSecretManager) server.getSecretManager(); 414 while (secretManager.getCurrentKey() == null) { 415 Thread.sleep(1); 416 } 417 } 418 419 @After 420 public void tearDown() throws Exception { 421 server.stop("Test complete"); 422 Threads.shutdown(serverThread); 423 TEST_UTIL.shutdownMiniZKCluster(); 424 } 425 426 @Test 427 public void testTokenCreation() throws Exception { 428 Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser"); 429 430 AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier(); 431 Writables.getWritable(token.getIdentifier(), ident); 432 assertEquals("Token username should match", "testuser", ident.getUsername()); 433 byte[] passwd = secretManager.retrievePassword(ident); 434 assertTrue("Token password and password from secret manager should match", 435 Bytes.equals(token.getPassword(), passwd)); 436 } 437 // This won't work any more now RpcServer takes Shaded Service. It depends on RPCServer being able 438 // to provide a 439 // non-shaded service. TODO: FIX. Tried to make RPC generic but then it ripples; have to make 440 // Connection generic. 441 // And Call generic, etc. 442 // 443 // @Test 444 // public void testTokenAuthentication() throws Exception { 445 // UserGroupInformation testuser = 446 // UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"}); 447 // testuser.setAuthenticationMethod( 448 // UserGroupInformation.AuthenticationMethod.TOKEN); 449 // final Configuration conf = TEST_UTIL.getConfiguration(); 450 // UserGroupInformation.setConfiguration(conf); 451 // Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser"); 452 // LOG.debug("Got token: " + token.toString()); 453 // testuser.addToken(token); 454 // // Verify the server authenticates us as this token user 455 // testuser.doAs(new PrivilegedExceptionAction<Object>() { 456 // public Object run() throws Exception { 457 // Configuration c = server.getConfiguration(); 458 // final RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString()); 459 // ServerName sn = 460 // ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(), 461 // EnvironmentEdgeManager.currentTime()); 462 // try { 463 // // Make a proxy to go between the shaded RpcController that rpc expects and the 464 // // non-shaded controller this CPEP is providing. This is because this test does a neat 465 // // little trick of testing the CPEP Service by inserting it as RpcServer Service. This 466 // // worked fine before we shaded PB. Now we need these proxies. 467 // final org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel channel = 468 // rpcClient.createBlockingRpcChannel(sn, User.getCurrent(), 469 // HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 470 // AuthenticationProtos.AuthenticationService.BlockingInterface stub = 471 // AuthenticationProtos.AuthenticationService.newBlockingStub(channel); 472 // AuthenticationProtos.WhoAmIResponse response = 473 // stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance()); 474 // String myname = response.getUsername(); 475 // assertEquals("testuser", myname); 476 // String authMethod = response.getAuthMethod(); 477 // assertEquals("TOKEN", authMethod); 478 // } finally { 479 // rpcClient.close(); 480 // } 481 // return null; 482 // } 483 // }); 484 // } 485 486 @Test 487 public void testUseExistingToken() throws Exception { 488 User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2", 489 new String[] { "testgroup" }); 490 Token<AuthenticationTokenIdentifier> token = secretManager.generateToken(user.getName()); 491 assertNotNull(token); 492 user.addToken(token); 493 494 // make sure we got a token 495 Token<AuthenticationTokenIdentifier> firstToken = 496 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); 497 assertNotNull(firstToken); 498 assertEquals(token, firstToken); 499 500 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 501 try { 502 assertFalse(TokenUtil.addTokenIfMissing(conn, user)); 503 // make sure we still have the same token 504 Token<AuthenticationTokenIdentifier> secondToken = 505 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); 506 assertEquals(firstToken, secondToken); 507 } finally { 508 conn.close(); 509 } 510 } 511}