001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security.token; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertTrue; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.when; 027 028import com.google.protobuf.BlockingService; 029import com.google.protobuf.RpcController; 030import com.google.protobuf.ServiceException; 031import java.io.IOException; 032import java.io.InterruptedIOException; 033import java.net.InetSocketAddress; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collection; 037import java.util.List; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.hbase.ChoreService; 041import org.apache.hadoop.hbase.ClusterId; 042import org.apache.hadoop.hbase.CoordinatedStateManager; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtility; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.client.ClusterConnection; 048import org.apache.hadoop.hbase.client.Connection; 049import org.apache.hadoop.hbase.client.ConnectionFactory; 050import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 052import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; 053import org.apache.hadoop.hbase.ipc.NettyRpcServer; 054import org.apache.hadoop.hbase.ipc.RpcServer; 055import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 056import org.apache.hadoop.hbase.ipc.RpcServerFactory; 057import org.apache.hadoop.hbase.ipc.RpcServerInterface; 058import org.apache.hadoop.hbase.ipc.ServerRpcController; 059import org.apache.hadoop.hbase.ipc.SimpleRpcServer; 060import org.apache.hadoop.hbase.log.HBaseMarkers; 061import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; 062import org.apache.hadoop.hbase.regionserver.RegionServerServices; 063import org.apache.hadoop.hbase.security.SecurityInfo; 064import org.apache.hadoop.hbase.security.User; 065import org.apache.hadoop.hbase.testclassification.MediumTests; 066import org.apache.hadoop.hbase.testclassification.SecurityTests; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 069import org.apache.hadoop.hbase.util.Sleeper; 070import org.apache.hadoop.hbase.util.Strings; 071import org.apache.hadoop.hbase.util.Threads; 072import org.apache.hadoop.hbase.util.Writables; 073import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 074import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 075import org.apache.hadoop.net.DNS; 076import org.apache.hadoop.security.authorize.PolicyProvider; 077import org.apache.hadoop.security.authorize.Service; 078import org.apache.hadoop.security.token.SecretManager; 079import org.apache.hadoop.security.token.Token; 080import org.apache.hadoop.security.token.TokenIdentifier; 081import org.junit.After; 082import org.junit.Before; 083import org.junit.ClassRule; 084import org.junit.Test; 085import org.junit.experimental.categories.Category; 086import org.junit.runner.RunWith; 087import org.junit.runners.Parameterized; 088import org.junit.runners.Parameterized.Parameter; 089import org.junit.runners.Parameterized.Parameters; 090import org.mockito.Mockito; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 095import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor; 096import org.apache.hbase.thirdparty.com.google.protobuf.Message; 097 098/** 099 * Tests for authentication token creation and usage 100 */ 101// This test does a fancy trick where it uses RpcServer and plugs in the Token Service for RpcServer 102// to offer up. It worked find pre-hbase-2.0.0 but post the shading project, it fails because 103// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded 104// protobufs. Since hbase-2.0.0, we added convertion from shaded to non-shaded so this test keeps 105// working. 106@RunWith(Parameterized.class) 107@Category({SecurityTests.class, MediumTests.class}) 108public class TestTokenAuthentication { 109 110 @ClassRule 111 public static final HBaseClassTestRule CLASS_RULE = 112 HBaseClassTestRule.forClass(TestTokenAuthentication.class); 113 114 static { 115 // Setting whatever system properties after recommendation from 116 // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html 117 System.setProperty("java.security.krb5.realm", "hbase"); 118 System.setProperty("java.security.krb5.kdc", "blah"); 119 } 120 121 /** 122 * Basic server process for RPC authentication testing 123 */ 124 private static class TokenServer extends TokenProvider implements 125 AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server { 126 private static final Logger LOG = LoggerFactory.getLogger(TokenServer.class); 127 private Configuration conf; 128 private HBaseTestingUtility TEST_UTIL; 129 private RpcServerInterface rpcServer; 130 private InetSocketAddress isa; 131 private ZKWatcher zookeeper; 132 private Sleeper sleeper; 133 private boolean started = false; 134 private boolean aborted = false; 135 private boolean stopped = false; 136 private long startcode; 137 138 public TokenServer(Configuration conf, HBaseTestingUtility TEST_UTIL) throws IOException { 139 this.conf = conf; 140 this.TEST_UTIL = TEST_UTIL; 141 this.startcode = EnvironmentEdgeManager.currentTime(); 142 // Server to handle client requests. 143 String hostname = 144 Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default")); 145 int port = 0; 146 // Creation of an ISA will force a resolve. 147 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); 148 if (initialIsa.getAddress() == null) { 149 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 150 } 151 final List<BlockingServiceAndInterface> sai = new ArrayList<>(1); 152 // Make a proxy to go between the shaded Service that rpc expects and the 153 // non-shaded Service this CPEP is providing. This is because this test does a neat 154 // little trick of testing the CPEP Service by inserting it as RpcServer Service. This 155 // worked fine before we shaded PB. Now we need these proxies. 156 final BlockingService service = 157 AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this); 158 final org.apache.hbase.thirdparty.com.google.protobuf.BlockingService proxy = 159 new org.apache.hbase.thirdparty.com.google.protobuf.BlockingService() { 160 @Override 161 public Message callBlockingMethod(MethodDescriptor md, 162 org.apache.hbase.thirdparty.com.google.protobuf.RpcController controller, 163 Message param) 164 throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { 165 com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor = 166 service.getDescriptorForType().findMethodByName(md.getName()); 167 com.google.protobuf.Message request = service.getRequestPrototype(methodDescriptor); 168 // TODO: Convert rpcController 169 com.google.protobuf.Message response = null; 170 try { 171 response = service.callBlockingMethod(methodDescriptor, null, request); 172 } catch (ServiceException e) { 173 throw new org.apache.hbase.thirdparty.com.google.protobuf.ServiceException(e); 174 } 175 return null;// Convert 'response'. 176 } 177 178 @Override 179 public ServiceDescriptor getDescriptorForType() { 180 return null; 181 } 182 183 @Override 184 public Message getRequestPrototype(MethodDescriptor arg0) { 185 // TODO Auto-generated method stub 186 return null; 187 } 188 189 @Override 190 public Message getResponsePrototype(MethodDescriptor arg0) { 191 // TODO Auto-generated method stub 192 return null; 193 } 194 }; 195 sai.add(new BlockingServiceAndInterface(proxy, 196 AuthenticationProtos.AuthenticationService.BlockingInterface.class)); 197 this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, initialIsa, conf, 198 new FifoRpcScheduler(conf, 1)); 199 InetSocketAddress address = rpcServer.getListenerAddress(); 200 if (address == null) { 201 throw new IOException("Listener channel is closed"); 202 } 203 this.isa = address; 204 this.sleeper = new Sleeper(1000, this); 205 } 206 207 @Override 208 public Configuration getConfiguration() { 209 return conf; 210 } 211 212 @Override 213 public ClusterConnection getConnection() { 214 return null; 215 } 216 217 @Override 218 public ZKWatcher getZooKeeper() { 219 return zookeeper; 220 } 221 222 @Override 223 public CoordinatedStateManager getCoordinatedStateManager() { 224 return null; 225 } 226 227 @Override 228 public boolean isAborted() { 229 return aborted; 230 } 231 232 @Override 233 public ServerName getServerName() { 234 return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode); 235 } 236 237 @Override 238 public FileSystem getFileSystem() { 239 return null; 240 } 241 242 @Override 243 public boolean isStopping() { 244 return this.stopped; 245 } 246 247 @Override 248 public void abort(String reason, Throwable error) { 249 LOG.error(HBaseMarkers.FATAL, "Aborting on: "+reason, error); 250 this.aborted = true; 251 this.stopped = true; 252 sleeper.skipSleepCycle(); 253 } 254 255 private void initialize() throws IOException { 256 // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth 257 Configuration zkConf = new Configuration(conf); 258 zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); 259 this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(), 260 this, true); 261 this.rpcServer.start(); 262 263 // Mock up region coprocessor environment 264 RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class, 265 Mockito.withSettings().extraInterfaces(HasRegionServerServices.class)); 266 when(mockRegionCpEnv.getConfiguration()).thenReturn(conf); 267 when(mockRegionCpEnv.getClassLoader()).then( 268 (var1) -> Thread.currentThread().getContextClassLoader()); 269 RegionServerServices mockRss = mock(RegionServerServices.class); 270 when(mockRss.getRpcServer()).thenReturn(rpcServer); 271 when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices()) 272 .thenReturn(mockRss); 273 274 super.start(mockRegionCpEnv); 275 started = true; 276 } 277 278 @Override 279 public void run() { 280 try { 281 initialize(); 282 while (!stopped) { 283 this.sleeper.sleep(); 284 } 285 } catch (Exception e) { 286 abort(e.getMessage(), e); 287 } 288 this.rpcServer.stop(); 289 } 290 291 public boolean isStarted() { 292 return started; 293 } 294 295 @Override 296 public void stop(String reason) { 297 LOG.info("Stopping due to: "+reason); 298 this.stopped = true; 299 sleeper.skipSleepCycle(); 300 } 301 302 @Override 303 public boolean isStopped() { 304 return stopped; 305 } 306 307 public InetSocketAddress getAddress() { 308 return isa; 309 } 310 311 public SecretManager<? extends TokenIdentifier> getSecretManager() { 312 return ((RpcServer)rpcServer).getSecretManager(); 313 } 314 315 @Override 316 public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken( 317 RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request) 318 throws ServiceException { 319 LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null)); 320 // Ignore above passed in controller -- it is always null 321 ServerRpcController serverController = new ServerRpcController(); 322 final NonShadedBlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> 323 callback = new NonShadedBlockingRpcCallback<>(); 324 getAuthenticationToken(null, request, callback); 325 try { 326 serverController.checkFailed(); 327 return callback.get(); 328 } catch (IOException ioe) { 329 throw new ServiceException(ioe); 330 } 331 } 332 333 @Override 334 public AuthenticationProtos.WhoAmIResponse whoAmI( 335 RpcController controller, AuthenticationProtos.WhoAmIRequest request) 336 throws ServiceException { 337 LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null)); 338 // Ignore above passed in controller -- it is always null 339 ServerRpcController serverController = new ServerRpcController(); 340 NonShadedBlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback = 341 new NonShadedBlockingRpcCallback<>(); 342 whoAmI(null, request, callback); 343 try { 344 serverController.checkFailed(); 345 return callback.get(); 346 } catch (IOException ioe) { 347 throw new ServiceException(ioe); 348 } 349 } 350 351 @Override 352 public ChoreService getChoreService() { 353 return null; 354 } 355 356 @Override 357 public ClusterConnection getClusterConnection() { 358 // TODO Auto-generated method stub 359 return null; 360 } 361 362 @Override 363 public Connection createConnection(Configuration conf) throws IOException { 364 return null; 365 } 366 } 367 368 @Parameters(name = "{index}: rpcServerImpl={0}") 369 public static Collection<Object[]> parameters() { 370 return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() }, 371 new Object[] { NettyRpcServer.class.getName() }); 372 } 373 374 @Parameter(0) 375 public String rpcServerImpl; 376 377 private HBaseTestingUtility TEST_UTIL; 378 private TokenServer server; 379 private Thread serverThread; 380 private AuthenticationTokenSecretManager secretManager; 381 private ClusterId clusterId = new ClusterId(); 382 383 @Before 384 public void setUp() throws Exception { 385 TEST_UTIL = new HBaseTestingUtility(); 386 TEST_UTIL.startMiniZKCluster(); 387 // register token type for protocol 388 SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(), 389 new SecurityInfo("hbase.test.kerberos.principal", 390 AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN)); 391 // security settings only added after startup so that ZK does not require SASL 392 Configuration conf = TEST_UTIL.getConfiguration(); 393 conf.set("hadoop.security.authentication", "kerberos"); 394 conf.set("hbase.security.authentication", "kerberos"); 395 conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true); 396 conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl); 397 server = new TokenServer(conf, TEST_UTIL); 398 serverThread = new Thread(server); 399 Threads.setDaemonThreadRunning(serverThread, "TokenServer:"+server.getServerName().toString()); 400 // wait for startup 401 while (!server.isStarted() && !server.isStopped()) { 402 Thread.sleep(10); 403 } 404 server.rpcServer.refreshAuthManager(new PolicyProvider() { 405 @Override 406 public Service[] getServices() { 407 return new Service [] { 408 new Service("security.client.protocol.acl", 409 AuthenticationProtos.AuthenticationService.BlockingInterface.class)}; 410 } 411 }); 412 ZKClusterId.setClusterId(server.getZooKeeper(), clusterId); 413 secretManager = (AuthenticationTokenSecretManager)server.getSecretManager(); 414 while(secretManager.getCurrentKey() == null) { 415 Thread.sleep(1); 416 } 417 } 418 419 @After 420 public void tearDown() throws Exception { 421 server.stop("Test complete"); 422 Threads.shutdown(serverThread); 423 TEST_UTIL.shutdownMiniZKCluster(); 424 } 425 426 @Test 427 public void testTokenCreation() throws Exception { 428 Token<AuthenticationTokenIdentifier> token = 429 secretManager.generateToken("testuser"); 430 431 AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier(); 432 Writables.getWritable(token.getIdentifier(), ident); 433 assertEquals("Token username should match", "testuser", 434 ident.getUsername()); 435 byte[] passwd = secretManager.retrievePassword(ident); 436 assertTrue("Token password and password from secret manager should match", 437 Bytes.equals(token.getPassword(), passwd)); 438 } 439// This won't work any more now RpcServer takes Shaded Service. It depends on RPCServer being able to provide a 440// non-shaded service. TODO: FIX. Tried to make RPC generic but then it ripples; have to make Connection generic. 441// And Call generic, etc. 442// 443// @Test 444// public void testTokenAuthentication() throws Exception { 445// UserGroupInformation testuser = 446// UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"}); 447// testuser.setAuthenticationMethod( 448// UserGroupInformation.AuthenticationMethod.TOKEN); 449// final Configuration conf = TEST_UTIL.getConfiguration(); 450// UserGroupInformation.setConfiguration(conf); 451// Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser"); 452// LOG.debug("Got token: " + token.toString()); 453// testuser.addToken(token); 454// // Verify the server authenticates us as this token user 455// testuser.doAs(new PrivilegedExceptionAction<Object>() { 456// public Object run() throws Exception { 457// Configuration c = server.getConfiguration(); 458// final RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString()); 459// ServerName sn = 460// ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(), 461// System.currentTimeMillis()); 462// try { 463// // Make a proxy to go between the shaded RpcController that rpc expects and the 464// // non-shaded controller this CPEP is providing. This is because this test does a neat 465// // little trick of testing the CPEP Service by inserting it as RpcServer Service. This 466// // worked fine before we shaded PB. Now we need these proxies. 467// final org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel channel = 468// rpcClient.createBlockingRpcChannel(sn, User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 469// AuthenticationProtos.AuthenticationService.BlockingInterface stub = 470// AuthenticationProtos.AuthenticationService.newBlockingStub(channel); 471// AuthenticationProtos.WhoAmIResponse response = 472// stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance()); 473// String myname = response.getUsername(); 474// assertEquals("testuser", myname); 475// String authMethod = response.getAuthMethod(); 476// assertEquals("TOKEN", authMethod); 477// } finally { 478// rpcClient.close(); 479// } 480// return null; 481// } 482// }); 483// } 484 485 @Test 486 public void testUseExistingToken() throws Exception { 487 User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2", 488 new String[]{"testgroup"}); 489 Token<AuthenticationTokenIdentifier> token = 490 secretManager.generateToken(user.getName()); 491 assertNotNull(token); 492 user.addToken(token); 493 494 // make sure we got a token 495 Token<AuthenticationTokenIdentifier> firstToken = 496 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); 497 assertNotNull(firstToken); 498 assertEquals(token, firstToken); 499 500 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 501 try { 502 assertFalse(TokenUtil.addTokenIfMissing(conn, user)); 503 // make sure we still have the same token 504 Token<AuthenticationTokenIdentifier> secondToken = 505 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); 506 assertEquals(firstToken, secondToken); 507 } finally { 508 conn.close(); 509 } 510 } 511 512 /** 513 * A copy of the BlockingRpcCallback class for use locally. Only difference is that it makes 514 * use of non-shaded protobufs; i.e. refers to com.google.protobuf.* rather than to 515 * org.apache.hbase.thirdparty.com.google.protobuf.* 516 */ 517 private static class NonShadedBlockingRpcCallback<R> implements 518 com.google.protobuf.RpcCallback<R> { 519 private R result; 520 private boolean resultSet = false; 521 522 /** 523 * Called on completion of the RPC call with the response object, or {@code null} in the case of 524 * an error. 525 * @param parameter the response object or {@code null} if an error occurred 526 */ 527 @Override 528 public void run(R parameter) { 529 synchronized (this) { 530 result = parameter; 531 resultSet = true; 532 this.notifyAll(); 533 } 534 } 535 536 /** 537 * Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was 538 * passed. When used asynchronously, this method will block until the {@link #run(Object)} 539 * method has been called. 540 * @return the response object or {@code null} if no response was passed 541 */ 542 public synchronized R get() throws IOException { 543 while (!resultSet) { 544 try { 545 this.wait(); 546 } catch (InterruptedException ie) { 547 InterruptedIOException exception = new InterruptedIOException(ie.getMessage()); 548 exception.initCause(ie); 549 throw exception; 550 } 551 } 552 return result; 553 } 554 } 555}