001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security.provider; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.containsString; 022import static org.junit.jupiter.api.Assertions.assertFalse; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025import static org.junit.jupiter.api.Assertions.fail; 026 027import java.io.ByteArrayInputStream; 028import java.io.DataInput; 029import java.io.DataInputStream; 030import java.io.DataOutput; 031import java.io.File; 032import java.io.IOException; 033import java.net.InetAddress; 034import java.security.PrivilegedExceptionAction; 035import java.util.Collection; 036import java.util.List; 037import java.util.Map; 038import java.util.Optional; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.stream.Collectors; 041import java.util.stream.Stream; 042import javax.security.auth.callback.Callback; 043import javax.security.auth.callback.CallbackHandler; 044import javax.security.auth.callback.NameCallback; 045import javax.security.auth.callback.PasswordCallback; 046import javax.security.auth.callback.UnsupportedCallbackException; 047import javax.security.sasl.AuthorizeCallback; 048import javax.security.sasl.RealmCallback; 049import javax.security.sasl.RealmChoiceCallback; 050import javax.security.sasl.Sasl; 051import javax.security.sasl.SaslClient; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.hbase.Cell; 055import org.apache.hadoop.hbase.CellUtil; 056import org.apache.hadoop.hbase.HBaseTestingUtil; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.LocalHBaseCluster; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.TableNameTestExtension; 061import org.apache.hadoop.hbase.client.Admin; 062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 063import org.apache.hadoop.hbase.client.Connection; 064import org.apache.hadoop.hbase.client.ConnectionFactory; 065import org.apache.hadoop.hbase.client.Get; 066import org.apache.hadoop.hbase.client.Put; 067import org.apache.hadoop.hbase.client.Result; 068import org.apache.hadoop.hbase.client.RetriesExhaustedException; 069import org.apache.hadoop.hbase.client.Table; 070import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 071import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 072import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException; 073import org.apache.hadoop.hbase.ipc.BlockingRpcClient; 074import org.apache.hadoop.hbase.ipc.NettyRpcClient; 075import org.apache.hadoop.hbase.ipc.RpcClientFactory; 076import org.apache.hadoop.hbase.ipc.RpcServerFactory; 077import org.apache.hadoop.hbase.security.AccessDeniedException; 078import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 079import org.apache.hadoop.hbase.security.SaslUtil; 080import org.apache.hadoop.hbase.security.User; 081import org.apache.hadoop.hbase.security.token.SecureTestCluster; 082import org.apache.hadoop.hbase.security.token.TokenProvider; 083import org.apache.hadoop.hbase.util.Bytes; 084import org.apache.hadoop.hbase.util.CommonFSUtils; 085import org.apache.hadoop.hbase.util.Pair; 086import org.apache.hadoop.io.Text; 087import org.apache.hadoop.io.WritableUtils; 088import org.apache.hadoop.minikdc.MiniKdc; 089import org.apache.hadoop.security.UserGroupInformation; 090import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 091import org.apache.hadoop.security.token.SecretManager; 092import org.apache.hadoop.security.token.SecretManager.InvalidToken; 093import org.apache.hadoop.security.token.Token; 094import org.apache.hadoop.security.token.TokenIdentifier; 095import org.junit.jupiter.api.AfterAll; 096import org.junit.jupiter.api.AfterEach; 097import org.junit.jupiter.api.BeforeEach; 098import org.junit.jupiter.api.TestTemplate; 099import org.junit.jupiter.api.extension.RegisterExtension; 100import org.junit.jupiter.params.provider.Arguments; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103 104import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 105 106import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 107 108/** 109 * Tests the pluggable authentication framework with SASL using a contrived authentication system. 110 * This tests holds a "user database" in memory as a hashmap. Clients provide their password in the 111 * client Hadoop configuration. The servers validate this password via the "user database". 112 */ 113public abstract class CustomSaslAuthenticationProviderTestBase { 114 115 private static final Logger LOG = 116 LoggerFactory.getLogger(CustomSaslAuthenticationProviderTestBase.class); 117 118 private static final Map<String, String> USER_DATABASE = createUserDatabase(); 119 120 private static final String USER1_PASSWORD = "foobarbaz"; 121 private static final String USER2_PASSWORD = "bazbarfoo"; 122 123 public static Stream<Arguments> parameters() { 124 return Stream.of(Arguments.of(BlockingRpcClient.class.getName()), 125 Arguments.of(NettyRpcClient.class.getName())); 126 } 127 128 private String rpcClientImpl; 129 130 public CustomSaslAuthenticationProviderTestBase(String rpcClientImpl) { 131 this.rpcClientImpl = rpcClientImpl; 132 } 133 134 private static Map<String, String> createUserDatabase() { 135 Map<String, String> db = new ConcurrentHashMap<>(); 136 db.put("user1", USER1_PASSWORD); 137 db.put("user2", USER2_PASSWORD); 138 return db; 139 } 140 141 public static String getPassword(String user) { 142 String password = USER_DATABASE.get(user); 143 if (password == null) { 144 throw new IllegalStateException("Cannot request password for a user that doesn't exist"); 145 } 146 return password; 147 } 148 149 /** 150 * A custom token identifier for our custom auth'n method. Unique from the TokenIdentifier used 151 * for delegation tokens. 152 */ 153 public static class PasswordAuthTokenIdentifier extends TokenIdentifier { 154 public static final Text PASSWORD_AUTH_TOKEN = new Text("HBASE_PASSWORD_TEST_TOKEN"); 155 private String username; 156 157 public PasswordAuthTokenIdentifier() { 158 } 159 160 public PasswordAuthTokenIdentifier(String username) { 161 this.username = username; 162 } 163 164 @Override 165 public void readFields(DataInput in) throws IOException { 166 this.username = WritableUtils.readString(in); 167 } 168 169 @Override 170 public void write(DataOutput out) throws IOException { 171 WritableUtils.writeString(out, username); 172 } 173 174 @Override 175 public Text getKind() { 176 return PASSWORD_AUTH_TOKEN; 177 } 178 179 @Override 180 public UserGroupInformation getUser() { 181 if (username == null || "".equals(username)) { 182 return null; 183 } 184 return UserGroupInformation.createRemoteUser(username); 185 } 186 } 187 188 public static Token<? extends TokenIdentifier> createPasswordToken(String username, 189 String password, String clusterId) { 190 PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(username); 191 Token<? extends TokenIdentifier> token = 192 new Token<>(id.getBytes(), Bytes.toBytes(password), id.getKind(), new Text(clusterId)); 193 return token; 194 } 195 196 /** 197 * Client provider that finds custom Token in the user's UGI and authenticates with the server via 198 * DIGEST-MD5 using that password. 199 */ 200 public static class InMemoryClientProvider extends AbstractSaslClientAuthenticationProvider { 201 public static final String MECHANISM = "DIGEST-MD5"; 202 public static final SaslAuthMethod SASL_AUTH_METHOD = 203 new SaslAuthMethod("IN_MEMORY", (byte) 42, MECHANISM, AuthenticationMethod.TOKEN); 204 205 @Override 206 public SaslClient createClient(Configuration conf, InetAddress serverAddr, 207 String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, 208 Map<String, String> saslProps) throws IOException { 209 return Sasl.createSaslClient(new String[] { MECHANISM }, null, null, 210 SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryClientProviderCallbackHandler(token)); 211 } 212 213 public Optional<Token<? extends TokenIdentifier>> findToken(User user) { 214 List<Token<? extends TokenIdentifier>> tokens = user.getTokens().stream() 215 .filter((token) -> token.getKind().equals(PasswordAuthTokenIdentifier.PASSWORD_AUTH_TOKEN)) 216 .collect(Collectors.toList()); 217 if (tokens.isEmpty()) { 218 return Optional.empty(); 219 } 220 if (tokens.size() > 1) { 221 throw new IllegalStateException("Cannot handle more than one PasswordAuthToken"); 222 } 223 return Optional.of(tokens.get(0)); 224 } 225 226 @Override 227 public SaslAuthMethod getSaslAuthMethod() { 228 return SASL_AUTH_METHOD; 229 } 230 231 /** 232 * Sasl CallbackHandler which extracts information from our custom token and places it into the 233 * Sasl objects. 234 */ 235 public class InMemoryClientProviderCallbackHandler implements CallbackHandler { 236 private final Token<? extends TokenIdentifier> token; 237 238 public InMemoryClientProviderCallbackHandler(Token<? extends TokenIdentifier> token) { 239 this.token = token; 240 } 241 242 @Override 243 public void handle(Callback[] callbacks) throws UnsupportedCallbackException { 244 NameCallback nc = null; 245 PasswordCallback pc = null; 246 RealmCallback rc = null; 247 for (Callback callback : callbacks) { 248 if (callback instanceof RealmChoiceCallback) { 249 continue; 250 } else if (callback instanceof NameCallback) { 251 nc = (NameCallback) callback; 252 } else if (callback instanceof PasswordCallback) { 253 pc = (PasswordCallback) callback; 254 } else if (callback instanceof RealmCallback) { 255 rc = (RealmCallback) callback; 256 } else { 257 throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); 258 } 259 } 260 if (nc != null) { 261 nc.setName(SaslUtil.encodeIdentifier(token.getIdentifier())); 262 } 263 if (pc != null) { 264 pc.setPassword(SaslUtil.encodePassword(token.getPassword())); 265 } 266 if (rc != null) { 267 rc.setText(rc.getDefaultText()); 268 } 269 } 270 } 271 272 @Override 273 public UserInformation getUserInfo(User user) { 274 return null; 275 } 276 } 277 278 /** 279 * Server provider which validates credentials from an in-memory database. 280 */ 281 public static class InMemoryServerProvider extends InMemoryClientProvider 282 implements SaslServerAuthenticationProvider { 283 284 @Override 285 public AttemptingUserProvidingSaslServer 286 createServer(SecretManager<TokenIdentifier> secretManager, Map<String, String> saslProps) 287 throws IOException { 288 return new AttemptingUserProvidingSaslServer( 289 Sasl.createSaslServer(getSaslAuthMethod().getSaslMechanism(), null, 290 SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryServerProviderCallbackHandler()), 291 () -> null); 292 } 293 294 /** 295 * Pulls the correct password for the user who started the SASL handshake so that SASL can 296 * validate that the user provided the right password. 297 */ 298 private class InMemoryServerProviderCallbackHandler implements CallbackHandler { 299 300 @Override 301 public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException { 302 NameCallback nc = null; 303 PasswordCallback pc = null; 304 AuthorizeCallback ac = null; 305 for (Callback callback : callbacks) { 306 if (callback instanceof AuthorizeCallback) { 307 ac = (AuthorizeCallback) callback; 308 } else if (callback instanceof NameCallback) { 309 nc = (NameCallback) callback; 310 } else if (callback instanceof PasswordCallback) { 311 pc = (PasswordCallback) callback; 312 } else if (callback instanceof RealmCallback) { 313 continue; // realm is ignored 314 } else { 315 throw new UnsupportedCallbackException(callback, "Unrecognized SASL Callback"); 316 } 317 } 318 if (nc != null && pc != null) { 319 byte[] encodedId = SaslUtil.decodeIdentifier(nc.getDefaultName()); 320 PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(); 321 try { 322 id.readFields(new DataInputStream(new ByteArrayInputStream(encodedId))); 323 } catch (IOException e) { 324 throw (InvalidToken) new InvalidToken("Can't de-serialize tokenIdentifier") 325 .initCause(e); 326 } 327 char[] actualPassword = 328 SaslUtil.encodePassword(Bytes.toBytes(getPassword(id.getUser().getUserName()))); 329 pc.setPassword(actualPassword); 330 } 331 if (ac != null) { 332 String authid = ac.getAuthenticationID(); 333 String authzid = ac.getAuthorizationID(); 334 if (authid.equals(authzid)) { 335 ac.setAuthorized(true); 336 } else { 337 ac.setAuthorized(false); 338 } 339 if (ac.isAuthorized()) { 340 ac.setAuthorizedID(authzid); 341 } 342 } 343 } 344 } 345 346 @Override 347 public boolean supportsProtocolAuthentication() { 348 return false; 349 } 350 351 @Override 352 public UserGroupInformation getAuthorizedUgi(String authzId, 353 SecretManager<TokenIdentifier> secretManager) throws IOException { 354 UserGroupInformation authorizedUgi; 355 byte[] encodedId = SaslUtil.decodeIdentifier(authzId); 356 PasswordAuthTokenIdentifier tokenId = new PasswordAuthTokenIdentifier(); 357 try { 358 tokenId.readFields(new DataInputStream(new ByteArrayInputStream(encodedId))); 359 } catch (IOException e) { 360 throw new IOException("Can't de-serialize PasswordAuthTokenIdentifier", e); 361 } 362 authorizedUgi = tokenId.getUser(); 363 if (authorizedUgi == null) { 364 throw new AccessDeniedException("Can't retrieve username from tokenIdentifier."); 365 } 366 authorizedUgi.addTokenIdentifier(tokenId); 367 authorizedUgi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod()); 368 return authorizedUgi; 369 } 370 } 371 372 /** 373 * Custom provider which can select our custom provider, amongst other tokens which may be 374 * available. 375 */ 376 public static class InMemoryProviderSelector extends BuiltInProviderSelector { 377 private InMemoryClientProvider inMemoryProvider; 378 379 @Override 380 public void configure(Configuration conf, 381 Collection<SaslClientAuthenticationProvider> providers) { 382 super.configure(conf, providers); 383 Optional<SaslClientAuthenticationProvider> o = 384 providers.stream().filter((p) -> p instanceof InMemoryClientProvider).findAny(); 385 386 inMemoryProvider = (InMemoryClientProvider) o.orElseThrow(() -> new RuntimeException( 387 "InMemoryClientProvider not found in available providers: " + providers)); 388 } 389 390 @Override 391 public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> 392 selectProvider(String clusterId, User user) { 393 Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> superPair = 394 super.selectProvider(clusterId, user); 395 396 Optional<Token<? extends TokenIdentifier>> optional = inMemoryProvider.findToken(user); 397 if (optional.isPresent()) { 398 LOG.info("Using InMemoryClientProvider"); 399 return new Pair<>(inMemoryProvider, optional.get()); 400 } 401 402 LOG.info("InMemoryClientProvider not usable, falling back to {}", superPair); 403 return superPair; 404 } 405 } 406 407 private static void createBaseCluster(HBaseTestingUtil util, File keytabFile, MiniKdc kdc) 408 throws Exception { 409 String servicePrincipal = "hbase/localhost"; 410 String spnegoPrincipal = "HTTP/localhost"; 411 kdc.createPrincipal(keytabFile, servicePrincipal); 412 util.startMiniZKCluster(); 413 414 HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(), 415 servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm()); 416 HBaseKerberosUtils.setSSLConfiguration(util, SecureTestCluster.class); 417 418 util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 419 TokenProvider.class.getName()); 420 util.startMiniDFSCluster(1); 421 Path rootdir = util.getDataTestDirOnTestFS("TestCustomSaslAuthenticationProvider"); 422 CommonFSUtils.setRootDir(util.getConfiguration(), rootdir); 423 } 424 425 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 426 private static final Configuration CONF = UTIL.getConfiguration(); 427 private static LocalHBaseCluster CLUSTER; 428 private static File KEYTAB_FILE; 429 430 protected static void startCluster(String rpcServerImpl) throws Exception { 431 KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath()); 432 final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE); 433 434 // Adds our test impls instead of creating service loader entries which 435 // might inadvertently get them loaded on a real cluster. 436 CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY, 437 InMemoryClientProvider.class.getName()); 438 CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY, 439 InMemoryServerProvider.class.getName()); 440 CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY, 441 InMemoryProviderSelector.class.getName()); 442 createBaseCluster(UTIL, KEYTAB_FILE, kdc); 443 CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl); 444 CLUSTER = new LocalHBaseCluster(CONF, 1); 445 CLUSTER.startup(); 446 } 447 448 @AfterAll 449 public static void shutdownCluster() throws Exception { 450 if (CLUSTER != null) { 451 CLUSTER.shutdown(); 452 CLUSTER = null; 453 } 454 UTIL.shutdownMiniDFSCluster(); 455 UTIL.shutdownMiniZKCluster(); 456 UTIL.cleanupTestDir(); 457 } 458 459 @BeforeEach 460 public void setUp() throws Exception { 461 createTable(); 462 } 463 464 @AfterEach 465 public void tearDown() throws IOException { 466 UTIL.deleteTable(name.getTableName()); 467 } 468 469 @RegisterExtension 470 private final TableNameTestExtension name = new TableNameTestExtension(); 471 472 private TableName tableName; 473 474 private String clusterId; 475 476 private void createTable() throws Exception { 477 tableName = name.getTableName(); 478 479 // Create a table and write a record as the service user (hbase) 480 UserGroupInformation serviceUgi = UserGroupInformation 481 .loginUserFromKeytabAndReturnUGI("hbase/localhost", KEYTAB_FILE.getAbsolutePath()); 482 clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() { 483 @Override 484 public String run() throws Exception { 485 try (Connection conn = ConnectionFactory.createConnection(CONF); 486 Admin admin = conn.getAdmin();) { 487 admin.createTable(TableDescriptorBuilder.newBuilder(tableName) 488 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1")).build()); 489 490 UTIL.waitTableAvailable(tableName); 491 492 try (Table t = conn.getTable(tableName)) { 493 Put p = new Put(Bytes.toBytes("r1")); 494 p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1")); 495 t.put(p); 496 } 497 498 return admin.getClusterMetrics().getClusterId(); 499 } 500 } 501 }); 502 assertNotNull(clusterId); 503 } 504 505 private Configuration getClientConf() { 506 Configuration conf = new Configuration(CONF); 507 conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl); 508 return conf; 509 } 510 511 @TestTemplate 512 public void testPositiveAuthentication() throws Exception { 513 // Validate that we can read that record back out as the user with our custom auth'n 514 UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]); 515 user1.addToken(createPasswordToken("user1", USER1_PASSWORD, clusterId)); 516 user1.doAs(new PrivilegedExceptionAction<Void>() { 517 @Override 518 public Void run() throws Exception { 519 try (Connection conn = ConnectionFactory.createConnection(getClientConf()); 520 Table t = conn.getTable(tableName)) { 521 Result r = t.get(new Get(Bytes.toBytes("r1"))); 522 assertNotNull(r); 523 assertFalse(r.isEmpty(), "Should have read a non-empty Result"); 524 final Cell cell = r.getColumnLatestCell(Bytes.toBytes("f1"), Bytes.toBytes("q1")); 525 assertTrue(CellUtil.matchingValue(cell, Bytes.toBytes("1")), "Unexpected value"); 526 527 return null; 528 } 529 } 530 }); 531 } 532 533 @TestTemplate 534 public void testNegativeAuthentication() throws Exception { 535 // Validate that we can read that record back out as the user with our custom auth'n 536 UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]); 537 user1.addToken(createPasswordToken("user1", "definitely not the password", clusterId)); 538 user1.doAs(new PrivilegedExceptionAction<Void>() { 539 @Override 540 public Void run() throws Exception { 541 Configuration clientConf = getClientConf(); 542 clientConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 543 // Depending on the registry in use, the following code can throw exceptions at different 544 // places. Master registry fails at the createConnection() step because the RPC to the 545 // master fails with sasl auth. With ZK registry, connection creation succeeds (since there 546 // is no RPC to HBase services involved) but the subsequent get() fails. The root cause 547 // should still be a SaslException in both the cases. 548 try (Connection conn = ConnectionFactory.createConnection(clientConf); 549 Table t = conn.getTable(tableName)) { 550 t.get(new Get(Bytes.toBytes("r1"))); 551 fail("Should not successfully authenticate with HBase"); 552 } catch (MasterRegistryFetchException mfe) { 553 Throwable cause = mfe.getCause(); 554 assertThat(cause.getMessage(), containsString("SaslException")); 555 } catch (RetriesExhaustedException re) { 556 assertThat(re.getMessage(), containsString("SaslException")); 557 } catch (Exception e) { 558 // Any other exception is unexpected. 559 fail("Unexpected exception caught, was expecting a authentication error: " 560 + Throwables.getStackTraceAsString(e)); 561 } 562 return null; 563 } 564 }); 565 } 566}