001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security.token; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import com.google.protobuf.BlockingService; 029import com.google.protobuf.RpcController; 030import com.google.protobuf.ServiceException; 031import java.io.IOException; 032import java.io.InterruptedIOException; 033import java.net.InetSocketAddress; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.List; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.hbase.ChoreService; 041import org.apache.hadoop.hbase.ClusterId; 042import org.apache.hadoop.hbase.CoordinatedStateManager; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.Server; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.client.ClusterConnection; 049import org.apache.hadoop.hbase.client.Connection; 050import org.apache.hadoop.hbase.client.ConnectionFactory; 051import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 053import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; 054import org.apache.hadoop.hbase.ipc.NettyRpcServer; 055import org.apache.hadoop.hbase.ipc.RpcServer; 056import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 057import org.apache.hadoop.hbase.ipc.RpcServerFactory; 058import org.apache.hadoop.hbase.ipc.RpcServerInterface; 059import org.apache.hadoop.hbase.ipc.ServerRpcController; 060import org.apache.hadoop.hbase.ipc.SimpleRpcServer; 061import org.apache.hadoop.hbase.log.HBaseMarkers; 062import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; 063import org.apache.hadoop.hbase.regionserver.RegionServerServices; 064import org.apache.hadoop.hbase.security.SecurityInfo; 065import org.apache.hadoop.hbase.security.User; 066import org.apache.hadoop.hbase.testclassification.MediumTests; 067import org.apache.hadoop.hbase.testclassification.SecurityTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 070import org.apache.hadoop.hbase.util.Sleeper; 071import org.apache.hadoop.hbase.util.Strings; 072import org.apache.hadoop.hbase.util.Threads; 073import org.apache.hadoop.hbase.util.Writables; 074import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 075import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 076import org.apache.hadoop.net.DNS; 077import org.apache.hadoop.security.authorize.PolicyProvider; 078import org.apache.hadoop.security.authorize.Service; 079import org.apache.hadoop.security.token.SecretManager; 080import org.apache.hadoop.security.token.Token; 081import org.apache.hadoop.security.token.TokenIdentifier; 082import org.junit.After; 083import org.junit.Before; 084import org.junit.ClassRule; 085import org.junit.Test; 086import org.junit.experimental.categories.Category; 087import org.junit.runner.RunWith; 088import org.junit.runners.Parameterized; 089import org.junit.runners.Parameterized.Parameter; 090import org.junit.runners.Parameterized.Parameters; 091import org.mockito.Mockito; 092import org.slf4j.Logger; 093import org.slf4j.LoggerFactory; 094 095import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 096import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor; 097import org.apache.hbase.thirdparty.com.google.protobuf.Message; 098 099/** 100 * Tests for authentication token creation and usage 101 */ 102// This test does a fancy trick where it uses RpcServer and plugs in the Token Service for RpcServer 103// to offer up. It worked find pre-hbase-2.0.0 but post the shading project, it fails because 104// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded 105// protobufs. Since hbase-2.0.0, we added convertion from shaded to non-shaded so this test keeps 106// working. 107@RunWith(Parameterized.class) 108@Category({SecurityTests.class, MediumTests.class}) 109public class TestTokenAuthentication { 110 111 @ClassRule 112 public static final HBaseClassTestRule CLASS_RULE = 113 HBaseClassTestRule.forClass(TestTokenAuthentication.class); 114 115 static { 116 // Setting whatever system properties after recommendation from 117 // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html 118 System.setProperty("java.security.krb5.realm", "hbase"); 119 System.setProperty("java.security.krb5.kdc", "blah"); 120 } 121 122 /** 123 * Basic server process for RPC authentication testing 124 */ 125 private static class TokenServer extends TokenProvider implements 126 AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server { 127 private static final Logger LOG = LoggerFactory.getLogger(TokenServer.class); 128 private Configuration conf; 129 private HBaseTestingUtility TEST_UTIL; 130 private RpcServerInterface rpcServer; 131 private InetSocketAddress isa; 132 private ZKWatcher zookeeper; 133 private Sleeper sleeper; 134 private boolean started = false; 135 private boolean aborted = false; 136 private boolean stopped = false; 137 private long startcode; 138 139 public TokenServer(Configuration conf, HBaseTestingUtility TEST_UTIL) throws IOException { 140 this.conf = conf; 141 this.TEST_UTIL = TEST_UTIL; 142 this.startcode = EnvironmentEdgeManager.currentTime(); 143 // Server to handle client requests. 144 String hostname = 145 Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default")); 146 int port = 0; 147 // Creation of an ISA will force a resolve. 148 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); 149 if (initialIsa.getAddress() == null) { 150 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 151 } 152 final List<BlockingServiceAndInterface> sai = new ArrayList<>(1); 153 // Make a proxy to go between the shaded Service that rpc expects and the 154 // non-shaded Service this CPEP is providing. This is because this test does a neat 155 // little trick of testing the CPEP Service by inserting it as RpcServer Service. This 156 // worked fine before we shaded PB. Now we need these proxies. 157 final BlockingService service = 158 AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this); 159 final org.apache.hbase.thirdparty.com.google.protobuf.BlockingService proxy = 160 new org.apache.hbase.thirdparty.com.google.protobuf.BlockingService() { 161 @Override 162 public Message callBlockingMethod(MethodDescriptor md, 163 org.apache.hbase.thirdparty.com.google.protobuf.RpcController controller, 164 Message param) 165 throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { 166 com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor = 167 service.getDescriptorForType().findMethodByName(md.getName()); 168 com.google.protobuf.Message request = service.getRequestPrototype(methodDescriptor); 169 // TODO: Convert rpcController 170 com.google.protobuf.Message response = null; 171 try { 172 response = service.callBlockingMethod(methodDescriptor, null, request); 173 } catch (ServiceException e) { 174 throw new org.apache.hbase.thirdparty.com.google.protobuf.ServiceException(e); 175 } 176 return null;// Convert 'response'. 177 } 178 179 @Override 180 public ServiceDescriptor getDescriptorForType() { 181 return null; 182 } 183 184 @Override 185 public Message getRequestPrototype(MethodDescriptor arg0) { 186 // TODO Auto-generated method stub 187 return null; 188 } 189 190 @Override 191 public Message getResponsePrototype(MethodDescriptor arg0) { 192 // TODO Auto-generated method stub 193 return null; 194 } 195 }; 196 sai.add(new BlockingServiceAndInterface(proxy, 197 AuthenticationProtos.AuthenticationService.BlockingInterface.class)); 198 this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, initialIsa, conf, 199 new FifoRpcScheduler(conf, 1)); 200 InetSocketAddress address = rpcServer.getListenerAddress(); 201 if (address == null) { 202 throw new IOException("Listener channel is closed"); 203 } 204 this.isa = address; 205 this.sleeper = new Sleeper(1000, this); 206 } 207 208 @Override 209 public Configuration getConfiguration() { 210 return conf; 211 } 212 213 @Override 214 public ClusterConnection getConnection() { 215 return null; 216 } 217 218 @Override 219 public ZKWatcher getZooKeeper() { 220 return zookeeper; 221 } 222 223 @Override 224 public CoordinatedStateManager getCoordinatedStateManager() { 225 return null; 226 } 227 228 @Override 229 public boolean isAborted() { 230 return aborted; 231 } 232 233 @Override 234 public ServerName getServerName() { 235 return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode); 236 } 237 238 @Override 239 public FileSystem getFileSystem() { 240 return null; 241 } 242 243 @Override 244 public boolean isStopping() { 245 return this.stopped; 246 } 247 248 @Override 249 public void abort(String reason, Throwable error) { 250 LOG.error(HBaseMarkers.FATAL, "Aborting on: "+reason, error); 251 this.aborted = true; 252 this.stopped = true; 253 sleeper.skipSleepCycle(); 254 } 255 256 private void initialize() throws IOException { 257 // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth 258 Configuration zkConf = new Configuration(conf); 259 zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); 260 this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(), 261 this, true); 262 this.rpcServer.start(); 263 264 // Mock up region coprocessor environment 265 RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class, 266 Mockito.withSettings().extraInterfaces(HasRegionServerServices.class)); 267 when(mockRegionCpEnv.getConfiguration()).thenReturn(conf); 268 when(mockRegionCpEnv.getClassLoader()).then( 269 (var1) -> Thread.currentThread().getContextClassLoader()); 270 RegionServerServices mockRss = mock(RegionServerServices.class); 271 when(mockRss.getRpcServer()).thenReturn(rpcServer); 272 when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices()) 273 .thenReturn(mockRss); 274 275 super.start(mockRegionCpEnv); 276 started = true; 277 } 278 279 @Override 280 public void run() { 281 try { 282 initialize(); 283 while (!stopped) { 284 this.sleeper.sleep(); 285 } 286 } catch (Exception e) { 287 abort(e.getMessage(), e); 288 } 289 this.rpcServer.stop(); 290 } 291 292 public boolean isStarted() { 293 return started; 294 } 295 296 @Override 297 public void stop(String reason) { 298 LOG.info("Stopping due to: "+reason); 299 this.stopped = true; 300 sleeper.skipSleepCycle(); 301 } 302 303 @Override 304 public boolean isStopped() { 305 return stopped; 306 } 307 308 public InetSocketAddress getAddress() { 309 return isa; 310 } 311 312 public SecretManager<? extends TokenIdentifier> getSecretManager() { 313 return ((RpcServer)rpcServer).getSecretManager(); 314 } 315 316 @Override 317 public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken( 318 RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request) 319 throws ServiceException { 320 LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null)); 321 // Ignore above passed in controller -- it is always null 322 ServerRpcController serverController = new ServerRpcController(); 323 final NonShadedBlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> 324 callback = new NonShadedBlockingRpcCallback<>(); 325 getAuthenticationToken(null, request, callback); 326 try { 327 serverController.checkFailed(); 328 return callback.get(); 329 } catch (IOException ioe) { 330 throw new ServiceException(ioe); 331 } 332 } 333 334 @Override 335 public AuthenticationProtos.WhoAmIResponse whoAmI( 336 RpcController controller, AuthenticationProtos.WhoAmIRequest request) 337 throws ServiceException { 338 LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null)); 339 // Ignore above passed in controller -- it is always null 340 ServerRpcController serverController = new ServerRpcController(); 341 NonShadedBlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback = 342 new NonShadedBlockingRpcCallback<>(); 343 whoAmI(null, request, callback); 344 try { 345 serverController.checkFailed(); 346 return callback.get(); 347 } catch (IOException ioe) { 348 throw new ServiceException(ioe); 349 } 350 } 351 352 @Override 353 public ChoreService getChoreService() { 354 return null; 355 } 356 357 @Override 358 public ClusterConnection getClusterConnection() { 359 // TODO Auto-generated method stub 360 return null; 361 } 362 363 @Override 364 public Connection createConnection(Configuration conf) throws IOException { 365 return null; 366 } 367 } 368 369 @Parameters(name = "{index}: rpcServerImpl={0}") 370 public static Collection<Object[]> parameters() { 371 return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() }, 372 new Object[] { NettyRpcServer.class.getName() }); 373 } 374 375 @Parameter(0) 376 public String rpcServerImpl; 377 378 private HBaseTestingUtility TEST_UTIL; 379 private TokenServer server; 380 private Thread serverThread; 381 private AuthenticationTokenSecretManager secretManager; 382 private ClusterId clusterId = new ClusterId(); 383 384 @Before 385 public void setUp() throws Exception { 386 TEST_UTIL = new HBaseTestingUtility(); 387 // Override the connection registry to avoid spinning up a mini cluster for the connection below 388 // to go through. 389 TEST_UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 390 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 391 TEST_UTIL.startMiniZKCluster(); 392 // register token type for protocol 393 SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(), 394 new SecurityInfo("hbase.test.kerberos.principal", 395 AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN)); 396 // security settings only added after startup so that ZK does not require SASL 397 Configuration conf = TEST_UTIL.getConfiguration(); 398 conf.set("hadoop.security.authentication", "kerberos"); 399 conf.set("hbase.security.authentication", "kerberos"); 400 conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true); 401 conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl); 402 server = new TokenServer(conf, TEST_UTIL); 403 serverThread = new Thread(server); 404 Threads.setDaemonThreadRunning(serverThread, "TokenServer:"+server.getServerName().toString()); 405 // wait for startup 406 while (!server.isStarted() && !server.isStopped()) { 407 Thread.sleep(10); 408 } 409 server.rpcServer.refreshAuthManager(conf, new PolicyProvider() { 410 @Override 411 public Service[] getServices() { 412 return new Service [] { 413 new Service("security.client.protocol.acl", 414 AuthenticationProtos.AuthenticationService.BlockingInterface.class)}; 415 } 416 }); 417 ZKClusterId.setClusterId(server.getZooKeeper(), clusterId); 418 secretManager = (AuthenticationTokenSecretManager)server.getSecretManager(); 419 while(secretManager.getCurrentKey() == null) { 420 Thread.sleep(1); 421 } 422 } 423 424 @After 425 public void tearDown() throws Exception { 426 server.stop("Test complete"); 427 Threads.shutdown(serverThread); 428 TEST_UTIL.shutdownMiniZKCluster(); 429 } 430 431 @Test 432 public void testTokenCreation() throws Exception { 433 Token<AuthenticationTokenIdentifier> token = 434 secretManager.generateToken("testuser"); 435 436 AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier(); 437 Writables.getWritable(token.getIdentifier(), ident); 438 assertEquals("Token username should match", "testuser", 439 ident.getUsername()); 440 byte[] passwd = secretManager.retrievePassword(ident); 441 assertTrue("Token password and password from secret manager should match", 442 Bytes.equals(token.getPassword(), passwd)); 443 } 444// This won't work any more now RpcServer takes Shaded Service. It depends on RPCServer being able to provide a 445// non-shaded service. TODO: FIX. Tried to make RPC generic but then it ripples; have to make Connection generic. 446// And Call generic, etc. 447// 448// @Test 449// public void testTokenAuthentication() throws Exception { 450// UserGroupInformation testuser = 451// UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"}); 452// testuser.setAuthenticationMethod( 453// UserGroupInformation.AuthenticationMethod.TOKEN); 454// final Configuration conf = TEST_UTIL.getConfiguration(); 455// UserGroupInformation.setConfiguration(conf); 456// Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser"); 457// LOG.debug("Got token: " + token.toString()); 458// testuser.addToken(token); 459// // Verify the server authenticates us as this token user 460// testuser.doAs(new PrivilegedExceptionAction<Object>() { 461// public Object run() throws Exception { 462// Configuration c = server.getConfiguration(); 463// final RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString()); 464// ServerName sn = 465// ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(), 466// System.currentTimeMillis()); 467// try { 468// // Make a proxy to go between the shaded RpcController that rpc expects and the 469// // non-shaded controller this CPEP is providing. This is because this test does a neat 470// // little trick of testing the CPEP Service by inserting it as RpcServer Service. This 471// // worked fine before we shaded PB. Now we need these proxies. 472// final org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel channel = 473// rpcClient.createBlockingRpcChannel(sn, User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 474// AuthenticationProtos.AuthenticationService.BlockingInterface stub = 475// AuthenticationProtos.AuthenticationService.newBlockingStub(channel); 476// AuthenticationProtos.WhoAmIResponse response = 477// stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance()); 478// String myname = response.getUsername(); 479// assertEquals("testuser", myname); 480// String authMethod = response.getAuthMethod(); 481// assertEquals("TOKEN", authMethod); 482// } finally { 483// rpcClient.close(); 484// } 485// return null; 486// } 487// }); 488// } 489 490 @Test 491 public void testUseExistingToken() throws Exception { 492 User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2", 493 new String[]{"testgroup"}); 494 Token<AuthenticationTokenIdentifier> token = 495 secretManager.generateToken(user.getName()); 496 assertNotNull(token); 497 user.addToken(token); 498 499 // make sure we got a token 500 Token<AuthenticationTokenIdentifier> firstToken = 501 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); 502 assertNotNull(firstToken); 503 assertEquals(token, firstToken); 504 505 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 506 try { 507 assertFalse(TokenUtil.addTokenIfMissing(conn, user)); 508 // make sure we still have the same token 509 Token<AuthenticationTokenIdentifier> secondToken = 510 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); 511 assertEquals(firstToken, secondToken); 512 } finally { 513 conn.close(); 514 } 515 } 516 517 /** 518 * A copy of the BlockingRpcCallback class for use locally. Only difference is that it makes 519 * use of non-shaded protobufs; i.e. refers to com.google.protobuf.* rather than to 520 * org.apache.hbase.thirdparty.com.google.protobuf.* 521 */ 522 private static class NonShadedBlockingRpcCallback<R> implements 523 com.google.protobuf.RpcCallback<R> { 524 private R result; 525 private boolean resultSet = false; 526 527 /** 528 * Called on completion of the RPC call with the response object, or {@code null} in the case of 529 * an error. 530 * @param parameter the response object or {@code null} if an error occurred 531 */ 532 @Override 533 public void run(R parameter) { 534 synchronized (this) { 535 result = parameter; 536 resultSet = true; 537 this.notifyAll(); 538 } 539 } 540 541 /** 542 * Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was 543 * passed. When used asynchronously, this method will block until the {@link #run(Object)} 544 * method has been called. 545 * @return the response object or {@code null} if no response was passed 546 */ 547 public synchronized R get() throws IOException { 548 while (!resultSet) { 549 try { 550 this.wait(); 551 } catch (InterruptedException ie) { 552 InterruptedIOException exception = new InterruptedIOException(ie.getMessage()); 553 exception.initCause(ie); 554 throw exception; 555 } 556 } 557 return result; 558 } 559 } 560}