001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security.provider; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.ByteArrayInputStream; 026import java.io.DataInput; 027import java.io.DataInputStream; 028import java.io.DataOutput; 029import java.io.File; 030import java.io.IOException; 031import java.net.InetAddress; 032import java.security.PrivilegedExceptionAction; 033import java.util.Arrays; 034import java.util.Collection; 035import java.util.List; 036import java.util.Map; 037import java.util.Optional; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.stream.Collectors; 040import javax.security.auth.callback.Callback; 041import javax.security.auth.callback.CallbackHandler; 042import javax.security.auth.callback.NameCallback; 043import javax.security.auth.callback.PasswordCallback; 044import javax.security.auth.callback.UnsupportedCallbackException; 045import javax.security.sasl.AuthorizeCallback; 046import javax.security.sasl.RealmCallback; 047import javax.security.sasl.RealmChoiceCallback; 048import javax.security.sasl.Sasl; 049import javax.security.sasl.SaslClient; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.CellUtil; 054import org.apache.hadoop.hbase.HBaseTestingUtil; 055import org.apache.hadoop.hbase.HConstants; 056import org.apache.hadoop.hbase.LocalHBaseCluster; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.TableNameTestRule; 059import org.apache.hadoop.hbase.client.Admin; 060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 061import org.apache.hadoop.hbase.client.Connection; 062import org.apache.hadoop.hbase.client.ConnectionFactory; 063import org.apache.hadoop.hbase.client.Get; 064import org.apache.hadoop.hbase.client.Put; 065import org.apache.hadoop.hbase.client.Result; 066import org.apache.hadoop.hbase.client.RetriesExhaustedException; 067import org.apache.hadoop.hbase.client.Table; 068import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 069import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 070import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException; 071import org.apache.hadoop.hbase.ipc.BlockingRpcClient; 072import org.apache.hadoop.hbase.ipc.NettyRpcClient; 073import org.apache.hadoop.hbase.ipc.RpcClientFactory; 074import org.apache.hadoop.hbase.ipc.RpcServerFactory; 075import org.apache.hadoop.hbase.security.AccessDeniedException; 076import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 077import org.apache.hadoop.hbase.security.SaslUtil; 078import org.apache.hadoop.hbase.security.User; 079import org.apache.hadoop.hbase.security.token.SecureTestCluster; 080import org.apache.hadoop.hbase.security.token.TokenProvider; 081import org.apache.hadoop.hbase.util.Bytes; 082import org.apache.hadoop.hbase.util.CommonFSUtils; 083import org.apache.hadoop.hbase.util.Pair; 084import org.apache.hadoop.io.Text; 085import org.apache.hadoop.io.WritableUtils; 086import org.apache.hadoop.minikdc.MiniKdc; 087import org.apache.hadoop.security.UserGroupInformation; 088import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 089import org.apache.hadoop.security.token.SecretManager; 090import org.apache.hadoop.security.token.SecretManager.InvalidToken; 091import org.apache.hadoop.security.token.Token; 092import org.apache.hadoop.security.token.TokenIdentifier; 093import org.junit.After; 094import org.junit.AfterClass; 095import org.junit.Before; 096import org.junit.Rule; 097import org.junit.Test; 098import org.junit.runners.Parameterized.Parameter; 099import org.junit.runners.Parameterized.Parameters; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 104 105import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 106 107/** 108 * Tests the pluggable authentication framework with SASL using a contrived authentication system. 109 * This tests holds a "user database" in memory as a hashmap. Clients provide their password in the 110 * client Hadoop configuration. The servers validate this password via the "user database". 111 */ 112public abstract class CustomSaslAuthenticationProviderTestBase { 113 114 private static final Logger LOG = 115 LoggerFactory.getLogger(CustomSaslAuthenticationProviderTestBase.class); 116 117 private static final Map<String, String> USER_DATABASE = createUserDatabase(); 118 119 private static final String USER1_PASSWORD = "foobarbaz"; 120 private static final String USER2_PASSWORD = "bazbarfoo"; 121 122 @Parameters 123 public static Collection<Object[]> parameters() { 124 return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() }, 125 new Object[] { NettyRpcClient.class.getName() }); 126 } 127 128 @Parameter 129 public String rpcClientImpl; 130 131 private static Map<String, String> createUserDatabase() { 132 Map<String, String> db = new ConcurrentHashMap<>(); 133 db.put("user1", USER1_PASSWORD); 134 db.put("user2", USER2_PASSWORD); 135 return db; 136 } 137 138 public static String getPassword(String user) { 139 String password = USER_DATABASE.get(user); 140 if (password == null) { 141 throw new IllegalStateException("Cannot request password for a user that doesn't exist"); 142 } 143 return password; 144 } 145 146 /** 147 * A custom token identifier for our custom auth'n method. Unique from the TokenIdentifier used 148 * for delegation tokens. 149 */ 150 public static class PasswordAuthTokenIdentifier extends TokenIdentifier { 151 public static final Text PASSWORD_AUTH_TOKEN = new Text("HBASE_PASSWORD_TEST_TOKEN"); 152 private String username; 153 154 public PasswordAuthTokenIdentifier() { 155 } 156 157 public PasswordAuthTokenIdentifier(String username) { 158 this.username = username; 159 } 160 161 @Override 162 public void readFields(DataInput in) throws IOException { 163 this.username = WritableUtils.readString(in); 164 } 165 166 @Override 167 public void write(DataOutput out) throws IOException { 168 WritableUtils.writeString(out, username); 169 } 170 171 @Override 172 public Text getKind() { 173 return PASSWORD_AUTH_TOKEN; 174 } 175 176 @Override 177 public UserGroupInformation getUser() { 178 if (username == null || "".equals(username)) { 179 return null; 180 } 181 return UserGroupInformation.createRemoteUser(username); 182 } 183 } 184 185 public static Token<? extends TokenIdentifier> createPasswordToken(String username, 186 String password, String clusterId) { 187 PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(username); 188 Token<? extends TokenIdentifier> token = 189 new Token<>(id.getBytes(), Bytes.toBytes(password), id.getKind(), new Text(clusterId)); 190 return token; 191 } 192 193 /** 194 * Client provider that finds custom Token in the user's UGI and authenticates with the server via 195 * DIGEST-MD5 using that password. 196 */ 197 public static class InMemoryClientProvider extends AbstractSaslClientAuthenticationProvider { 198 public static final String MECHANISM = "DIGEST-MD5"; 199 public static final SaslAuthMethod SASL_AUTH_METHOD = 200 new SaslAuthMethod("IN_MEMORY", (byte) 42, MECHANISM, AuthenticationMethod.TOKEN); 201 202 @Override 203 public SaslClient createClient(Configuration conf, InetAddress serverAddr, 204 String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, 205 Map<String, String> saslProps) throws IOException { 206 return Sasl.createSaslClient(new String[] { MECHANISM }, null, null, 207 SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryClientProviderCallbackHandler(token)); 208 } 209 210 public Optional<Token<? extends TokenIdentifier>> findToken(User user) { 211 List<Token<? extends TokenIdentifier>> tokens = user.getTokens().stream() 212 .filter((token) -> token.getKind().equals(PasswordAuthTokenIdentifier.PASSWORD_AUTH_TOKEN)) 213 .collect(Collectors.toList()); 214 if (tokens.isEmpty()) { 215 return Optional.empty(); 216 } 217 if (tokens.size() > 1) { 218 throw new IllegalStateException("Cannot handle more than one PasswordAuthToken"); 219 } 220 return Optional.of(tokens.get(0)); 221 } 222 223 @Override 224 public SaslAuthMethod getSaslAuthMethod() { 225 return SASL_AUTH_METHOD; 226 } 227 228 /** 229 * Sasl CallbackHandler which extracts information from our custom token and places it into the 230 * Sasl objects. 231 */ 232 public class InMemoryClientProviderCallbackHandler implements CallbackHandler { 233 private final Token<? extends TokenIdentifier> token; 234 235 public InMemoryClientProviderCallbackHandler(Token<? extends TokenIdentifier> token) { 236 this.token = token; 237 } 238 239 @Override 240 public void handle(Callback[] callbacks) throws UnsupportedCallbackException { 241 NameCallback nc = null; 242 PasswordCallback pc = null; 243 RealmCallback rc = null; 244 for (Callback callback : callbacks) { 245 if (callback instanceof RealmChoiceCallback) { 246 continue; 247 } else if (callback instanceof NameCallback) { 248 nc = (NameCallback) callback; 249 } else if (callback instanceof PasswordCallback) { 250 pc = (PasswordCallback) callback; 251 } else if (callback instanceof RealmCallback) { 252 rc = (RealmCallback) callback; 253 } else { 254 throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); 255 } 256 } 257 if (nc != null) { 258 nc.setName(SaslUtil.encodeIdentifier(token.getIdentifier())); 259 } 260 if (pc != null) { 261 pc.setPassword(SaslUtil.encodePassword(token.getPassword())); 262 } 263 if (rc != null) { 264 rc.setText(rc.getDefaultText()); 265 } 266 } 267 } 268 269 @Override 270 public UserInformation getUserInfo(User user) { 271 return null; 272 } 273 } 274 275 /** 276 * Server provider which validates credentials from an in-memory database. 277 */ 278 public static class InMemoryServerProvider extends InMemoryClientProvider 279 implements SaslServerAuthenticationProvider { 280 281 @Override 282 public AttemptingUserProvidingSaslServer 283 createServer(SecretManager<TokenIdentifier> secretManager, Map<String, String> saslProps) 284 throws IOException { 285 return new AttemptingUserProvidingSaslServer( 286 Sasl.createSaslServer(getSaslAuthMethod().getSaslMechanism(), null, 287 SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryServerProviderCallbackHandler()), 288 () -> null); 289 } 290 291 /** 292 * Pulls the correct password for the user who started the SASL handshake so that SASL can 293 * validate that the user provided the right password. 294 */ 295 private class InMemoryServerProviderCallbackHandler implements CallbackHandler { 296 297 @Override 298 public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException { 299 NameCallback nc = null; 300 PasswordCallback pc = null; 301 AuthorizeCallback ac = null; 302 for (Callback callback : callbacks) { 303 if (callback instanceof AuthorizeCallback) { 304 ac = (AuthorizeCallback) callback; 305 } else if (callback instanceof NameCallback) { 306 nc = (NameCallback) callback; 307 } else if (callback instanceof PasswordCallback) { 308 pc = (PasswordCallback) callback; 309 } else if (callback instanceof RealmCallback) { 310 continue; // realm is ignored 311 } else { 312 throw new UnsupportedCallbackException(callback, "Unrecognized SASL Callback"); 313 } 314 } 315 if (nc != null && pc != null) { 316 byte[] encodedId = SaslUtil.decodeIdentifier(nc.getDefaultName()); 317 PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(); 318 try { 319 id.readFields(new DataInputStream(new ByteArrayInputStream(encodedId))); 320 } catch (IOException e) { 321 throw (InvalidToken) new InvalidToken("Can't de-serialize tokenIdentifier") 322 .initCause(e); 323 } 324 char[] actualPassword = 325 SaslUtil.encodePassword(Bytes.toBytes(getPassword(id.getUser().getUserName()))); 326 pc.setPassword(actualPassword); 327 } 328 if (ac != null) { 329 String authid = ac.getAuthenticationID(); 330 String authzid = ac.getAuthorizationID(); 331 if (authid.equals(authzid)) { 332 ac.setAuthorized(true); 333 } else { 334 ac.setAuthorized(false); 335 } 336 if (ac.isAuthorized()) { 337 ac.setAuthorizedID(authzid); 338 } 339 } 340 } 341 } 342 343 @Override 344 public boolean supportsProtocolAuthentication() { 345 return false; 346 } 347 348 @Override 349 public UserGroupInformation getAuthorizedUgi(String authzId, 350 SecretManager<TokenIdentifier> secretManager) throws IOException { 351 UserGroupInformation authorizedUgi; 352 byte[] encodedId = SaslUtil.decodeIdentifier(authzId); 353 PasswordAuthTokenIdentifier tokenId = new PasswordAuthTokenIdentifier(); 354 try { 355 tokenId.readFields(new DataInputStream(new ByteArrayInputStream(encodedId))); 356 } catch (IOException e) { 357 throw new IOException("Can't de-serialize PasswordAuthTokenIdentifier", e); 358 } 359 authorizedUgi = tokenId.getUser(); 360 if (authorizedUgi == null) { 361 throw new AccessDeniedException("Can't retrieve username from tokenIdentifier."); 362 } 363 authorizedUgi.addTokenIdentifier(tokenId); 364 authorizedUgi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod()); 365 return authorizedUgi; 366 } 367 } 368 369 /** 370 * Custom provider which can select our custom provider, amongst other tokens which may be 371 * available. 372 */ 373 public static class InMemoryProviderSelector extends BuiltInProviderSelector { 374 private InMemoryClientProvider inMemoryProvider; 375 376 @Override 377 public void configure(Configuration conf, 378 Collection<SaslClientAuthenticationProvider> providers) { 379 super.configure(conf, providers); 380 Optional<SaslClientAuthenticationProvider> o = 381 providers.stream().filter((p) -> p instanceof InMemoryClientProvider).findAny(); 382 383 inMemoryProvider = (InMemoryClientProvider) o.orElseThrow(() -> new RuntimeException( 384 "InMemoryClientProvider not found in available providers: " + providers)); 385 } 386 387 @Override 388 public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> 389 selectProvider(String clusterId, User user) { 390 Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> superPair = 391 super.selectProvider(clusterId, user); 392 393 Optional<Token<? extends TokenIdentifier>> optional = inMemoryProvider.findToken(user); 394 if (optional.isPresent()) { 395 LOG.info("Using InMemoryClientProvider"); 396 return new Pair<>(inMemoryProvider, optional.get()); 397 } 398 399 LOG.info("InMemoryClientProvider not usable, falling back to {}", superPair); 400 return superPair; 401 } 402 } 403 404 private static void createBaseCluster(HBaseTestingUtil util, File keytabFile, MiniKdc kdc) 405 throws Exception { 406 String servicePrincipal = "hbase/localhost"; 407 String spnegoPrincipal = "HTTP/localhost"; 408 kdc.createPrincipal(keytabFile, servicePrincipal); 409 util.startMiniZKCluster(); 410 411 HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(), 412 servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm()); 413 HBaseKerberosUtils.setSSLConfiguration(util, SecureTestCluster.class); 414 415 util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 416 TokenProvider.class.getName()); 417 util.startMiniDFSCluster(1); 418 Path rootdir = util.getDataTestDirOnTestFS("TestCustomSaslAuthenticationProvider"); 419 CommonFSUtils.setRootDir(util.getConfiguration(), rootdir); 420 } 421 422 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 423 private static final Configuration CONF = UTIL.getConfiguration(); 424 private static LocalHBaseCluster CLUSTER; 425 private static File KEYTAB_FILE; 426 427 protected static void startCluster(String rpcServerImpl) throws Exception { 428 KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath()); 429 final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE); 430 431 // Adds our test impls instead of creating service loader entries which 432 // might inadvertently get them loaded on a real cluster. 433 CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY, 434 InMemoryClientProvider.class.getName()); 435 CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY, 436 InMemoryServerProvider.class.getName()); 437 CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY, 438 InMemoryProviderSelector.class.getName()); 439 createBaseCluster(UTIL, KEYTAB_FILE, kdc); 440 CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl); 441 CLUSTER = new LocalHBaseCluster(CONF, 1); 442 CLUSTER.startup(); 443 } 444 445 @AfterClass 446 public static void shutdownCluster() throws Exception { 447 if (CLUSTER != null) { 448 CLUSTER.shutdown(); 449 CLUSTER = null; 450 } 451 UTIL.shutdownMiniDFSCluster(); 452 UTIL.shutdownMiniZKCluster(); 453 UTIL.cleanupTestDir(); 454 } 455 456 @Before 457 public void setUp() throws Exception { 458 createTable(); 459 } 460 461 @After 462 public void tearDown() throws IOException { 463 UTIL.deleteTable(name.getTableName()); 464 } 465 466 @Rule 467 public TableNameTestRule name = new TableNameTestRule(); 468 469 private TableName tableName; 470 471 private String clusterId; 472 473 private void createTable() throws Exception { 474 tableName = name.getTableName(); 475 476 // Create a table and write a record as the service user (hbase) 477 UserGroupInformation serviceUgi = UserGroupInformation 478 .loginUserFromKeytabAndReturnUGI("hbase/localhost", KEYTAB_FILE.getAbsolutePath()); 479 clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() { 480 @Override 481 public String run() throws Exception { 482 try (Connection conn = ConnectionFactory.createConnection(CONF); 483 Admin admin = conn.getAdmin();) { 484 admin.createTable(TableDescriptorBuilder.newBuilder(tableName) 485 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1")).build()); 486 487 UTIL.waitTableAvailable(tableName); 488 489 try (Table t = conn.getTable(tableName)) { 490 Put p = new Put(Bytes.toBytes("r1")); 491 p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1")); 492 t.put(p); 493 } 494 495 return admin.getClusterMetrics().getClusterId(); 496 } 497 } 498 }); 499 assertNotNull(clusterId); 500 } 501 502 private Configuration getClientConf() { 503 Configuration conf = new Configuration(CONF); 504 conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl); 505 return conf; 506 } 507 508 @Test 509 public void testPositiveAuthentication() throws Exception { 510 // Validate that we can read that record back out as the user with our custom auth'n 511 UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]); 512 user1.addToken(createPasswordToken("user1", USER1_PASSWORD, clusterId)); 513 user1.doAs(new PrivilegedExceptionAction<Void>() { 514 @Override 515 public Void run() throws Exception { 516 try (Connection conn = ConnectionFactory.createConnection(getClientConf()); 517 Table t = conn.getTable(tableName)) { 518 Result r = t.get(new Get(Bytes.toBytes("r1"))); 519 assertNotNull(r); 520 assertFalse("Should have read a non-empty Result", r.isEmpty()); 521 final Cell cell = r.getColumnLatestCell(Bytes.toBytes("f1"), Bytes.toBytes("q1")); 522 assertTrue("Unexpected value", CellUtil.matchingValue(cell, Bytes.toBytes("1"))); 523 524 return null; 525 } 526 } 527 }); 528 } 529 530 @Test 531 public void testNegativeAuthentication() throws Exception { 532 // Validate that we can read that record back out as the user with our custom auth'n 533 UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]); 534 user1.addToken(createPasswordToken("user1", "definitely not the password", clusterId)); 535 user1.doAs(new PrivilegedExceptionAction<Void>() { 536 @Override 537 public Void run() throws Exception { 538 Configuration clientConf = getClientConf(); 539 clientConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 540 // Depending on the registry in use, the following code can throw exceptions at different 541 // places. Master registry fails at the createConnection() step because the RPC to the 542 // master fails with sasl auth. With ZK registry, connection creation succeeds (since there 543 // is no RPC to HBase services involved) but the subsequent get() fails. The root cause 544 // should still be a SaslException in both the cases. 545 try (Connection conn = ConnectionFactory.createConnection(clientConf); 546 Table t = conn.getTable(tableName)) { 547 t.get(new Get(Bytes.toBytes("r1"))); 548 fail("Should not successfully authenticate with HBase"); 549 } catch (MasterRegistryFetchException mfe) { 550 Throwable cause = mfe.getCause(); 551 assertTrue(cause.getMessage(), cause.getMessage().contains("SaslException")); 552 } catch (RetriesExhaustedException re) { 553 assertTrue(re.getMessage(), re.getMessage().contains("SaslException")); 554 } catch (Exception e) { 555 // Any other exception is unexpected. 556 fail("Unexpected exception caught, was expecting a authentication error: " 557 + Throwables.getStackTraceAsString(e)); 558 } 559 return null; 560 } 561 }); 562 } 563}