001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security.token; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import java.io.IOException; 029import java.net.InetSocketAddress; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.stream.Stream; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.hbase.ChoreService; 036import org.apache.hadoop.hbase.ClusterId; 037import org.apache.hadoop.hbase.CoordinatedStateManager; 038import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.Server; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.client.AsyncClusterConnection; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionFactory; 046import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 048import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; 049import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; 050import org.apache.hadoop.hbase.ipc.NettyRpcServer; 051import org.apache.hadoop.hbase.ipc.RpcServer; 052import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 053import org.apache.hadoop.hbase.ipc.RpcServerFactory; 054import org.apache.hadoop.hbase.ipc.RpcServerInterface; 055import org.apache.hadoop.hbase.ipc.ServerRpcController; 056import org.apache.hadoop.hbase.ipc.SimpleRpcServer; 057import org.apache.hadoop.hbase.keymeta.KeyManagementService; 058import org.apache.hadoop.hbase.log.HBaseMarkers; 059import org.apache.hadoop.hbase.regionserver.RegionServerServices; 060import org.apache.hadoop.hbase.security.SecurityInfo; 061import org.apache.hadoop.hbase.security.User; 062import org.apache.hadoop.hbase.testclassification.SecurityTests; 063import org.apache.hadoop.hbase.testclassification.SmallTests; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.hbase.util.Sleeper; 067import org.apache.hadoop.hbase.util.Strings; 068import org.apache.hadoop.hbase.util.Threads; 069import org.apache.hadoop.hbase.util.Writables; 070import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 071import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 072import org.apache.hadoop.net.DNS; 073import org.apache.hadoop.security.authorize.PolicyProvider; 074import org.apache.hadoop.security.authorize.Service; 075import org.apache.hadoop.security.token.SecretManager; 076import org.apache.hadoop.security.token.Token; 077import org.apache.hadoop.security.token.TokenIdentifier; 078import org.junit.jupiter.api.AfterEach; 079import org.junit.jupiter.api.BeforeEach; 080import org.junit.jupiter.api.Tag; 081import org.junit.jupiter.api.TestTemplate; 082import org.junit.jupiter.params.provider.Arguments; 083import org.mockito.Mockito; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 088import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 089import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor; 090import org.apache.hbase.thirdparty.com.google.protobuf.Message; 091import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 092import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 093 094import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos; 095 096/** 097 * Tests for authentication token creation and usage 098 */ 099// This test does a fancy trick where it uses RpcServer and plugs in the Token Service for RpcServer 100// to offer up. It worked find pre-hbase-2.0.0 but post the shading project, it fails because 101// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded 102// protobufs. Since hbase-2.0.0, we added convertion from shaded to non-shaded so this test keeps 103// working. 104@Tag(SecurityTests.TAG) 105@Tag(SmallTests.TAG) 106@HBaseParameterizedTestTemplate(name = "{index}: rpcServerImpl={0}") 107public class TestTokenAuthentication { 108 109 static { 110 // Setting whatever system properties after recommendation from 111 // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html 112 System.setProperty("java.security.krb5.realm", "hbase"); 113 System.setProperty("java.security.krb5.kdc", "blah"); 114 } 115 116 /** 117 * Basic server process for RPC authentication testing 118 */ 119 private static class TokenServer extends TokenProvider 120 implements AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server { 121 private static final Logger LOG = LoggerFactory.getLogger(TokenServer.class); 122 private Configuration conf; 123 private RpcServerInterface rpcServer; 124 private InetSocketAddress isa; 125 private ZKWatcher zookeeper; 126 private Sleeper sleeper; 127 private boolean started = false; 128 private boolean aborted = false; 129 private boolean stopped = false; 130 private long startcode; 131 132 public TokenServer(Configuration conf) throws IOException { 133 this.conf = conf; 134 this.startcode = EnvironmentEdgeManager.currentTime(); 135 // Server to handle client requests. 136 String hostname = 137 Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default")); 138 int port = 0; 139 // Creation of an ISA will force a resolve. 140 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); 141 if (initialIsa.getAddress() == null) { 142 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 143 } 144 final List<BlockingServiceAndInterface> sai = new ArrayList<>(1); 145 // Make a proxy to go between the shaded Service that rpc expects and the 146 // non-shaded Service this CPEP is providing. This is because this test does a neat 147 // little trick of testing the CPEP Service by inserting it as RpcServer Service. This 148 // worked fine before we shaded PB. Now we need these proxies. 149 final BlockingService service = 150 AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this); 151 final BlockingService proxy = new BlockingService() { 152 @Override 153 public Message callBlockingMethod(MethodDescriptor md, RpcController controller, 154 Message param) throws ServiceException { 155 MethodDescriptor methodDescriptor = 156 service.getDescriptorForType().findMethodByName(md.getName()); 157 Message request = service.getRequestPrototype(methodDescriptor); 158 // TODO: Convert rpcController 159 Message response = null; 160 try { 161 response = service.callBlockingMethod(methodDescriptor, null, request); 162 } catch (ServiceException e) { 163 throw new org.apache.hbase.thirdparty.com.google.protobuf.ServiceException(e); 164 } 165 return null;// Convert 'response'. 166 } 167 168 @Override 169 public ServiceDescriptor getDescriptorForType() { 170 return null; 171 } 172 173 @Override 174 public Message getRequestPrototype(MethodDescriptor arg0) { 175 // TODO Auto-generated method stub 176 return null; 177 } 178 179 @Override 180 public Message getResponsePrototype(MethodDescriptor arg0) { 181 // TODO Auto-generated method stub 182 return null; 183 } 184 }; 185 sai.add(new BlockingServiceAndInterface(proxy, 186 AuthenticationProtos.AuthenticationService.BlockingInterface.class)); 187 this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, initialIsa, conf, 188 new FifoRpcScheduler(conf, 1)); 189 InetSocketAddress address = rpcServer.getListenerAddress(); 190 if (address == null) { 191 throw new IOException("Listener channel is closed"); 192 } 193 this.isa = address; 194 this.sleeper = new Sleeper(1000, this); 195 } 196 197 @Override 198 public Configuration getConfiguration() { 199 return conf; 200 } 201 202 @Override 203 public Connection getConnection() { 204 return null; 205 } 206 207 @Override 208 public ZKWatcher getZooKeeper() { 209 return zookeeper; 210 } 211 212 @Override 213 public CoordinatedStateManager getCoordinatedStateManager() { 214 return null; 215 } 216 217 @Override 218 public boolean isAborted() { 219 return aborted; 220 } 221 222 @Override 223 public ServerName getServerName() { 224 return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode); 225 } 226 227 @Override 228 public FileSystem getFileSystem() { 229 return null; 230 } 231 232 @Override 233 public boolean isStopping() { 234 return this.stopped; 235 } 236 237 @Override 238 public void abort(String reason, Throwable error) { 239 LOG.error(HBaseMarkers.FATAL, "Aborting on: " + reason, error); 240 this.aborted = true; 241 this.stopped = true; 242 sleeper.skipSleepCycle(); 243 } 244 245 private void initialize() throws IOException { 246 // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth 247 Configuration zkConf = new Configuration(conf); 248 zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); 249 this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(), this, true); 250 this.rpcServer.start(); 251 252 // Mock up region coprocessor environment 253 RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class, 254 Mockito.withSettings().extraInterfaces(HasRegionServerServices.class)); 255 when(mockRegionCpEnv.getConfiguration()).thenReturn(conf); 256 when(mockRegionCpEnv.getClassLoader()) 257 .then((var1) -> Thread.currentThread().getContextClassLoader()); 258 RegionServerServices mockRss = mock(RegionServerServices.class); 259 when(mockRss.getRpcServer()).thenReturn(rpcServer); 260 when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices()) 261 .thenReturn(mockRss); 262 263 super.start(mockRegionCpEnv); 264 started = true; 265 } 266 267 @Override 268 public void run() { 269 try { 270 initialize(); 271 while (!stopped) { 272 this.sleeper.sleep(); 273 } 274 } catch (Exception e) { 275 abort(e.getMessage(), e); 276 } 277 this.rpcServer.stop(); 278 } 279 280 public boolean isStarted() { 281 return started; 282 } 283 284 @Override 285 public void stop(String reason) { 286 LOG.info("Stopping due to: " + reason); 287 this.stopped = true; 288 sleeper.skipSleepCycle(); 289 } 290 291 @Override 292 public boolean isStopped() { 293 return stopped; 294 } 295 296 public SecretManager<? extends TokenIdentifier> getSecretManager() { 297 return ((RpcServer) rpcServer).getSecretManager(); 298 } 299 300 @Override 301 public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken( 302 RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request) 303 throws ServiceException { 304 LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null)); 305 // Ignore above passed in controller -- it is always null 306 ServerRpcController serverController = new ServerRpcController(); 307 final BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> callback = 308 new BlockingRpcCallback<>(); 309 getAuthenticationToken(null, request, callback); 310 try { 311 serverController.checkFailed(); 312 return callback.get(); 313 } catch (IOException ioe) { 314 throw new ServiceException(ioe); 315 } 316 } 317 318 @Override 319 public AuthenticationProtos.WhoAmIResponse whoAmI(RpcController controller, 320 AuthenticationProtos.WhoAmIRequest request) throws ServiceException { 321 LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null)); 322 // Ignore above passed in controller -- it is always null 323 ServerRpcController serverController = new ServerRpcController(); 324 BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback = 325 new BlockingRpcCallback<>(); 326 whoAmI(null, request, callback); 327 try { 328 serverController.checkFailed(); 329 return callback.get(); 330 } catch (IOException ioe) { 331 throw new ServiceException(ioe); 332 } 333 } 334 335 @Override 336 public ChoreService getChoreService() { 337 return null; 338 } 339 340 @Override 341 public Connection createConnection(Configuration conf) throws IOException { 342 return null; 343 } 344 345 @Override 346 public AsyncClusterConnection getAsyncClusterConnection() { 347 return null; 348 } 349 350 @Override 351 public KeyManagementService getKeyManagementService() { 352 return null; 353 } 354 } 355 356 public static Stream<Arguments> parameters() { 357 return Stream.of(Arguments.of(SimpleRpcServer.class.getName()), 358 Arguments.of(NettyRpcServer.class.getName())); 359 } 360 361 public String rpcServerImpl; 362 363 public TestTokenAuthentication(String rpcServerImpl) { 364 this.rpcServerImpl = rpcServerImpl; 365 } 366 367 private HBaseTestingUtil TEST_UTIL; 368 private TokenServer server; 369 private Thread serverThread; 370 private AuthenticationTokenSecretManager secretManager; 371 private ClusterId clusterId = new ClusterId(); 372 373 @BeforeEach 374 public void setUp() throws Exception { 375 TEST_UTIL = new HBaseTestingUtil(); 376 // Override the connection registry to avoid spinning up a mini cluster for the connection below 377 // to go through. 378 TEST_UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 379 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 380 TEST_UTIL.startMiniZKCluster(); 381 // register token type for protocol 382 SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(), 383 new SecurityInfo("hbase.test.kerberos.principal", 384 AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN)); 385 // security settings only added after startup so that ZK does not require SASL 386 Configuration conf = TEST_UTIL.getConfiguration(); 387 conf.set("hadoop.security.authentication", "kerberos"); 388 conf.set("hbase.security.authentication", "kerberos"); 389 conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true); 390 conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl); 391 server = new TokenServer(conf); 392 serverThread = new Thread(server); 393 Threads.setDaemonThreadRunning(serverThread, 394 "TokenServer:" + server.getServerName().toString()); 395 // wait for startup 396 while (!server.isStarted() && !server.isStopped()) { 397 Thread.sleep(10); 398 } 399 server.rpcServer.refreshAuthManager(conf, new PolicyProvider() { 400 @Override 401 public Service[] getServices() { 402 return new Service[] { new Service("security.client.protocol.acl", 403 AuthenticationProtos.AuthenticationService.BlockingInterface.class) }; 404 } 405 }); 406 ZKClusterId.setClusterId(server.getZooKeeper(), clusterId); 407 secretManager = (AuthenticationTokenSecretManager) server.getSecretManager(); 408 while (secretManager.getCurrentKey() == null) { 409 Thread.sleep(1); 410 } 411 } 412 413 @AfterEach 414 public void tearDown() throws Exception { 415 server.stop("Test complete"); 416 Threads.shutdown(serverThread); 417 TEST_UTIL.shutdownMiniZKCluster(); 418 } 419 420 @TestTemplate 421 public void testTokenCreation() throws Exception { 422 Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser"); 423 424 AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier(); 425 Writables.getWritable(token.getIdentifier(), ident); 426 assertEquals("testuser", ident.getUsername(), "Token username should match"); 427 byte[] passwd = secretManager.retrievePassword(ident); 428 assertTrue(Bytes.equals(token.getPassword(), passwd), 429 "Token password and password from secret manager should match"); 430 } 431 // This won't work any more now RpcServer takes Shaded Service. It depends on RPCServer being able 432 // to provide a 433 // non-shaded service. TODO: FIX. Tried to make RPC generic but then it ripples; have to make 434 // Connection generic. 435 // And Call generic, etc. 436 // 437 // @Test 438 // public void testTokenAuthentication() throws Exception { 439 // UserGroupInformation testuser = 440 // UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"}); 441 // testuser.setAuthenticationMethod( 442 // UserGroupInformation.AuthenticationMethod.TOKEN); 443 // final Configuration conf = TEST_UTIL.getConfiguration(); 444 // UserGroupInformation.setConfiguration(conf); 445 // Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser"); 446 // LOG.debug("Got token: " + token.toString()); 447 // testuser.addToken(token); 448 // // Verify the server authenticates us as this token user 449 // testuser.doAs(new PrivilegedExceptionAction<Object>() { 450 // public Object run() throws Exception { 451 // Configuration c = server.getConfiguration(); 452 // final RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString()); 453 // ServerName sn = 454 // ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(), 455 // EnvironmentEdgeManager.currentTime()); 456 // try { 457 // // Make a proxy to go between the shaded RpcController that rpc expects and the 458 // // non-shaded controller this CPEP is providing. This is because this test does a neat 459 // // little trick of testing the CPEP Service by inserting it as RpcServer Service. This 460 // // worked fine before we shaded PB. Now we need these proxies. 461 // final org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel channel = 462 // rpcClient.createBlockingRpcChannel(sn, User.getCurrent(), 463 // HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 464 // AuthenticationProtos.AuthenticationService.BlockingInterface stub = 465 // AuthenticationProtos.AuthenticationService.newBlockingStub(channel); 466 // AuthenticationProtos.WhoAmIResponse response = 467 // stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance()); 468 // String myname = response.getUsername(); 469 // assertEquals("testuser", myname); 470 // String authMethod = response.getAuthMethod(); 471 // assertEquals("TOKEN", authMethod); 472 // } finally { 473 // rpcClient.close(); 474 // } 475 // return null; 476 // } 477 // }); 478 // } 479 480 @TestTemplate 481 public void testUseExistingToken() throws Exception { 482 User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2", 483 new String[] { "testgroup" }); 484 Token<AuthenticationTokenIdentifier> token = secretManager.generateToken(user.getName()); 485 assertNotNull(token); 486 user.addToken(token); 487 488 // make sure we got a token 489 Token<AuthenticationTokenIdentifier> firstToken = 490 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); 491 assertNotNull(firstToken); 492 assertEquals(token, firstToken); 493 494 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 495 try { 496 assertFalse(TokenUtil.addTokenIfMissing(conn, user)); 497 // make sure we still have the same token 498 Token<AuthenticationTokenIdentifier> secondToken = 499 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); 500 assertEquals(firstToken, secondToken); 501 } finally { 502 conn.close(); 503 } 504 } 505}