001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security.provider; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.ByteArrayInputStream; 027import java.io.DataInput; 028import java.io.DataInputStream; 029import java.io.DataOutput; 030import java.io.File; 031import java.io.IOException; 032import java.net.InetAddress; 033import java.security.PrivilegedExceptionAction; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.List; 037import java.util.Map; 038import java.util.Optional; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.stream.Collectors; 041import javax.security.auth.callback.Callback; 042import javax.security.auth.callback.CallbackHandler; 043import javax.security.auth.callback.NameCallback; 044import javax.security.auth.callback.PasswordCallback; 045import javax.security.auth.callback.UnsupportedCallbackException; 046import javax.security.sasl.AuthorizeCallback; 047import javax.security.sasl.RealmCallback; 048import javax.security.sasl.RealmChoiceCallback; 049import javax.security.sasl.Sasl; 050import javax.security.sasl.SaslClient; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.Cell; 054import org.apache.hadoop.hbase.CellUtil; 055import org.apache.hadoop.hbase.HBaseTestingUtility; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.LocalHBaseCluster; 058import org.apache.hadoop.hbase.TableName; 059import org.apache.hadoop.hbase.TableNameTestRule; 060import org.apache.hadoop.hbase.client.Admin; 061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 062import org.apache.hadoop.hbase.client.Connection; 063import org.apache.hadoop.hbase.client.ConnectionFactory; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.Result; 067import org.apache.hadoop.hbase.client.RetriesExhaustedException; 068import org.apache.hadoop.hbase.client.Table; 069import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 070import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 071import org.apache.hadoop.hbase.ipc.BlockingRpcClient; 072import org.apache.hadoop.hbase.ipc.NettyRpcClient; 073import org.apache.hadoop.hbase.ipc.RpcClientFactory; 074import org.apache.hadoop.hbase.ipc.RpcServerFactory; 075import org.apache.hadoop.hbase.security.AccessDeniedException; 076import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 077import org.apache.hadoop.hbase.security.SaslUtil; 078import org.apache.hadoop.hbase.security.SecurityInfo; 079import org.apache.hadoop.hbase.security.User; 080import org.apache.hadoop.hbase.security.token.SecureTestCluster; 081import org.apache.hadoop.hbase.security.token.TokenProvider; 082import org.apache.hadoop.hbase.util.Bytes; 083import org.apache.hadoop.hbase.util.CommonFSUtils; 084import org.apache.hadoop.hbase.util.Pair; 085import org.apache.hadoop.io.Text; 086import org.apache.hadoop.io.WritableUtils; 087import org.apache.hadoop.minikdc.MiniKdc; 088import org.apache.hadoop.security.UserGroupInformation; 089import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 090import org.apache.hadoop.security.token.SecretManager; 091import org.apache.hadoop.security.token.SecretManager.InvalidToken; 092import org.apache.hadoop.security.token.Token; 093import org.apache.hadoop.security.token.TokenIdentifier; 094import org.junit.After; 095import org.junit.AfterClass; 096import org.junit.Before; 097import org.junit.Rule; 098import org.junit.Test; 099import org.junit.runners.Parameterized.Parameter; 100import org.junit.runners.Parameterized.Parameters; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103 104import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 105 106import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 107 108/** 109 * Tests the pluggable authentication framework with SASL using a contrived authentication system. 110 * This tests holds a "user database" in memory as a hashmap. Clients provide their password in the 111 * client Hadoop configuration. The servers validate this password via the "user database". 112 */ 113public abstract class CustomSaslAuthenticationProviderTestBase { 114 115 private static final Logger LOG = 116 LoggerFactory.getLogger(CustomSaslAuthenticationProviderTestBase.class); 117 118 private static final Map<String, String> USER_DATABASE = createUserDatabase(); 119 120 private static final String USER1_PASSWORD = "foobarbaz"; 121 private static final String USER2_PASSWORD = "bazbarfoo"; 122 123 @Parameters 124 public static Collection<Object[]> parameters() { 125 return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() }, 126 new Object[] { NettyRpcClient.class.getName() }); 127 } 128 129 @Parameter 130 public String rpcClientImpl; 131 132 private static Map<String, String> createUserDatabase() { 133 Map<String, String> db = new ConcurrentHashMap<>(); 134 db.put("user1", USER1_PASSWORD); 135 db.put("user2", USER2_PASSWORD); 136 return db; 137 } 138 139 public static String getPassword(String user) { 140 String password = USER_DATABASE.get(user); 141 if (password == null) { 142 throw new IllegalStateException("Cannot request password for a user that doesn't exist"); 143 } 144 return password; 145 } 146 147 /** 148 * A custom token identifier for our custom auth'n method. Unique from the TokenIdentifier used 149 * for delegation tokens. 150 */ 151 public static class PasswordAuthTokenIdentifier extends TokenIdentifier { 152 public static final Text PASSWORD_AUTH_TOKEN = new Text("HBASE_PASSWORD_TEST_TOKEN"); 153 private String username; 154 155 public PasswordAuthTokenIdentifier() { 156 } 157 158 public PasswordAuthTokenIdentifier(String username) { 159 this.username = username; 160 } 161 162 @Override 163 public void readFields(DataInput in) throws IOException { 164 this.username = WritableUtils.readString(in); 165 } 166 167 @Override 168 public void write(DataOutput out) throws IOException { 169 WritableUtils.writeString(out, username); 170 } 171 172 @Override 173 public Text getKind() { 174 return PASSWORD_AUTH_TOKEN; 175 } 176 177 @Override 178 public UserGroupInformation getUser() { 179 if (username == null || "".equals(username)) { 180 return null; 181 } 182 return UserGroupInformation.createRemoteUser(username); 183 } 184 } 185 186 public static Token<? extends TokenIdentifier> createPasswordToken(String username, 187 String password, String clusterId) { 188 PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(username); 189 Token<? extends TokenIdentifier> token = 190 new Token<>(id.getBytes(), Bytes.toBytes(password), id.getKind(), new Text(clusterId)); 191 return token; 192 } 193 194 /** 195 * Client provider that finds custom Token in the user's UGI and authenticates with the server via 196 * DIGEST-MD5 using that password. 197 */ 198 public static class InMemoryClientProvider extends AbstractSaslClientAuthenticationProvider { 199 public static final String MECHANISM = "DIGEST-MD5"; 200 public static final SaslAuthMethod SASL_AUTH_METHOD = 201 new SaslAuthMethod("IN_MEMORY", (byte) 42, MECHANISM, AuthenticationMethod.TOKEN); 202 203 @Override 204 public SaslClient createClient(Configuration conf, InetAddress serverAddr, 205 SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, 206 Map<String, String> saslProps) throws IOException { 207 return Sasl.createSaslClient(new String[] { MECHANISM }, null, null, 208 SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryClientProviderCallbackHandler(token)); 209 } 210 211 public Optional<Token<? extends TokenIdentifier>> findToken(User user) { 212 List<Token<? extends TokenIdentifier>> tokens = user.getTokens().stream() 213 .filter((token) -> token.getKind().equals(PasswordAuthTokenIdentifier.PASSWORD_AUTH_TOKEN)) 214 .collect(Collectors.toList()); 215 if (tokens.isEmpty()) { 216 return Optional.empty(); 217 } 218 if (tokens.size() > 1) { 219 throw new IllegalStateException("Cannot handle more than one PasswordAuthToken"); 220 } 221 return Optional.of(tokens.get(0)); 222 } 223 224 @Override 225 public SaslAuthMethod getSaslAuthMethod() { 226 return SASL_AUTH_METHOD; 227 } 228 229 /** 230 * Sasl CallbackHandler which extracts information from our custom token and places it into the 231 * Sasl objects. 232 */ 233 public class InMemoryClientProviderCallbackHandler implements CallbackHandler { 234 private final Token<? extends TokenIdentifier> token; 235 236 public InMemoryClientProviderCallbackHandler(Token<? extends TokenIdentifier> token) { 237 this.token = token; 238 } 239 240 @Override 241 public void handle(Callback[] callbacks) throws UnsupportedCallbackException { 242 NameCallback nc = null; 243 PasswordCallback pc = null; 244 RealmCallback rc = null; 245 for (Callback callback : callbacks) { 246 if (callback instanceof RealmChoiceCallback) { 247 continue; 248 } else if (callback instanceof NameCallback) { 249 nc = (NameCallback) callback; 250 } else if (callback instanceof PasswordCallback) { 251 pc = (PasswordCallback) callback; 252 } else if (callback instanceof RealmCallback) { 253 rc = (RealmCallback) callback; 254 } else { 255 throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); 256 } 257 } 258 if (nc != null) { 259 nc.setName(SaslUtil.encodeIdentifier(token.getIdentifier())); 260 } 261 if (pc != null) { 262 pc.setPassword(SaslUtil.encodePassword(token.getPassword())); 263 } 264 if (rc != null) { 265 rc.setText(rc.getDefaultText()); 266 } 267 } 268 } 269 270 @Override 271 public UserInformation getUserInfo(User user) { 272 return null; 273 } 274 } 275 276 /** 277 * Server provider which validates credentials from an in-memory database. 278 */ 279 public static class InMemoryServerProvider extends InMemoryClientProvider 280 implements SaslServerAuthenticationProvider { 281 282 @Override 283 public AttemptingUserProvidingSaslServer 284 createServer(SecretManager<TokenIdentifier> secretManager, Map<String, String> saslProps) 285 throws IOException { 286 return new AttemptingUserProvidingSaslServer( 287 Sasl.createSaslServer(getSaslAuthMethod().getSaslMechanism(), null, 288 SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryServerProviderCallbackHandler()), 289 () -> null); 290 } 291 292 /** 293 * Pulls the correct password for the user who started the SASL handshake so that SASL can 294 * validate that the user provided the right password. 295 */ 296 private class InMemoryServerProviderCallbackHandler implements CallbackHandler { 297 298 @Override 299 public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException { 300 NameCallback nc = null; 301 PasswordCallback pc = null; 302 AuthorizeCallback ac = null; 303 for (Callback callback : callbacks) { 304 if (callback instanceof AuthorizeCallback) { 305 ac = (AuthorizeCallback) callback; 306 } else if (callback instanceof NameCallback) { 307 nc = (NameCallback) callback; 308 } else if (callback instanceof PasswordCallback) { 309 pc = (PasswordCallback) callback; 310 } else if (callback instanceof RealmCallback) { 311 continue; // realm is ignored 312 } else { 313 throw new UnsupportedCallbackException(callback, "Unrecognized SASL Callback"); 314 } 315 } 316 if (nc != null && pc != null) { 317 byte[] encodedId = SaslUtil.decodeIdentifier(nc.getDefaultName()); 318 PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(); 319 try { 320 id.readFields(new DataInputStream(new ByteArrayInputStream(encodedId))); 321 } catch (IOException e) { 322 throw (InvalidToken) new InvalidToken("Can't de-serialize tokenIdentifier") 323 .initCause(e); 324 } 325 char[] actualPassword = 326 SaslUtil.encodePassword(Bytes.toBytes(getPassword(id.getUser().getUserName()))); 327 pc.setPassword(actualPassword); 328 } 329 if (ac != null) { 330 String authid = ac.getAuthenticationID(); 331 String authzid = ac.getAuthorizationID(); 332 if (authid.equals(authzid)) { 333 ac.setAuthorized(true); 334 } else { 335 ac.setAuthorized(false); 336 } 337 if (ac.isAuthorized()) { 338 ac.setAuthorizedID(authzid); 339 } 340 } 341 } 342 } 343 344 @Override 345 public boolean supportsProtocolAuthentication() { 346 return false; 347 } 348 349 @Override 350 public UserGroupInformation getAuthorizedUgi(String authzId, 351 SecretManager<TokenIdentifier> secretManager) throws IOException { 352 UserGroupInformation authorizedUgi; 353 byte[] encodedId = SaslUtil.decodeIdentifier(authzId); 354 PasswordAuthTokenIdentifier tokenId = new PasswordAuthTokenIdentifier(); 355 try { 356 tokenId.readFields(new DataInputStream(new ByteArrayInputStream(encodedId))); 357 } catch (IOException e) { 358 throw new IOException("Can't de-serialize PasswordAuthTokenIdentifier", e); 359 } 360 authorizedUgi = tokenId.getUser(); 361 if (authorizedUgi == null) { 362 throw new AccessDeniedException("Can't retrieve username from tokenIdentifier."); 363 } 364 authorizedUgi.addTokenIdentifier(tokenId); 365 authorizedUgi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod()); 366 return authorizedUgi; 367 } 368 } 369 370 /** 371 * Custom provider which can select our custom provider, amongst other tokens which may be 372 * available. 373 */ 374 public static class InMemoryProviderSelector extends BuiltInProviderSelector { 375 private InMemoryClientProvider inMemoryProvider; 376 377 @Override 378 public void configure(Configuration conf, 379 Collection<SaslClientAuthenticationProvider> providers) { 380 super.configure(conf, providers); 381 Optional<SaslClientAuthenticationProvider> o = 382 providers.stream().filter((p) -> p instanceof InMemoryClientProvider).findAny(); 383 384 inMemoryProvider = (InMemoryClientProvider) o.orElseThrow(() -> new RuntimeException( 385 "InMemoryClientProvider not found in available providers: " + providers)); 386 } 387 388 @Override 389 public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> 390 selectProvider(String clusterId, User user) { 391 Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> superPair = 392 super.selectProvider(clusterId, user); 393 394 Optional<Token<? extends TokenIdentifier>> optional = inMemoryProvider.findToken(user); 395 if (optional.isPresent()) { 396 LOG.info("Using InMemoryClientProvider"); 397 return new Pair<>(inMemoryProvider, optional.get()); 398 } 399 400 LOG.info("InMemoryClientProvider not usable, falling back to {}", superPair); 401 return superPair; 402 } 403 } 404 405 private static void createBaseCluster(HBaseTestingUtility util, File keytabFile, MiniKdc kdc) 406 throws Exception { 407 String servicePrincipal = "hbase/localhost"; 408 String spnegoPrincipal = "HTTP/localhost"; 409 kdc.createPrincipal(keytabFile, servicePrincipal); 410 util.startMiniZKCluster(); 411 412 HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(), 413 servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm()); 414 HBaseKerberosUtils.setSSLConfiguration(util, SecureTestCluster.class); 415 416 util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 417 TokenProvider.class.getName()); 418 util.startMiniDFSCluster(1); 419 Path rootdir = util.getDataTestDirOnTestFS("TestCustomSaslAuthenticationProvider"); 420 CommonFSUtils.setRootDir(util.getConfiguration(), rootdir); 421 } 422 423 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 424 private static final Configuration CONF = UTIL.getConfiguration(); 425 private static LocalHBaseCluster CLUSTER; 426 private static File KEYTAB_FILE; 427 428 protected static void startCluster(String rpcServerImpl) throws Exception { 429 KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath()); 430 final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE); 431 432 // Adds our test impls instead of creating service loader entries which 433 // might inadvertently get them loaded on a real cluster. 434 CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY, 435 InMemoryClientProvider.class.getName()); 436 CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY, 437 InMemoryServerProvider.class.getName()); 438 CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY, 439 InMemoryProviderSelector.class.getName()); 440 CONF.setLong(HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN, 600); 441 createBaseCluster(UTIL, KEYTAB_FILE, kdc); 442 CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl); 443 CLUSTER = new LocalHBaseCluster(CONF, 1); 444 CLUSTER.startup(); 445 } 446 447 @AfterClass 448 public static void shutdownCluster() throws Exception { 449 if (CLUSTER != null) { 450 CLUSTER.shutdown(); 451 CLUSTER = null; 452 } 453 UTIL.shutdownMiniDFSCluster(); 454 UTIL.shutdownMiniZKCluster(); 455 UTIL.cleanupTestDir(); 456 } 457 458 @Before 459 public void setUp() throws Exception { 460 createTable(); 461 } 462 463 @After 464 public void tearDown() throws IOException { 465 UTIL.deleteTable(name.getTableName()); 466 } 467 468 @Rule 469 public TableNameTestRule name = new TableNameTestRule(); 470 471 private TableName tableName; 472 473 private String clusterId; 474 475 private void createTable() throws Exception { 476 tableName = name.getTableName(); 477 478 // Create a table and write a record as the service user (hbase) 479 UserGroupInformation serviceUgi = UserGroupInformation 480 .loginUserFromKeytabAndReturnUGI("hbase/localhost", KEYTAB_FILE.getAbsolutePath()); 481 clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() { 482 @Override 483 public String run() throws Exception { 484 try (Connection conn = ConnectionFactory.createConnection(CONF); 485 Admin admin = conn.getAdmin();) { 486 admin.createTable(TableDescriptorBuilder.newBuilder(tableName) 487 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1")).build()); 488 489 UTIL.waitTableAvailable(tableName); 490 491 try (Table t = conn.getTable(tableName)) { 492 Put p = new Put(Bytes.toBytes("r1")); 493 p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1")); 494 t.put(p); 495 } 496 497 return admin.getClusterMetrics().getClusterId(); 498 } 499 } 500 }); 501 assertNotNull(clusterId); 502 } 503 504 private Configuration getClientConf() { 505 Configuration conf = new Configuration(CONF); 506 conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl); 507 return conf; 508 } 509 510 @Test 511 public void testPositiveAuthentication() throws Exception { 512 // Validate that we can read that record back out as the user with our custom auth'n 513 UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]); 514 user1.addToken(createPasswordToken("user1", USER1_PASSWORD, clusterId)); 515 user1.doAs(new PrivilegedExceptionAction<Void>() { 516 @Override 517 public Void run() throws Exception { 518 try (Connection conn = ConnectionFactory.createConnection(getClientConf()); 519 Table t = conn.getTable(tableName)) { 520 Result r = t.get(new Get(Bytes.toBytes("r1"))); 521 assertNotNull(r); 522 assertFalse("Should have read a non-empty Result", r.isEmpty()); 523 final Cell cell = r.getColumnLatestCell(Bytes.toBytes("f1"), Bytes.toBytes("q1")); 524 assertTrue("Unexpected value", CellUtil.matchingValue(cell, Bytes.toBytes("1"))); 525 526 return null; 527 } 528 } 529 }); 530 } 531 532 @Test 533 public void testNegativeAuthentication() throws Exception { 534 // Validate that we can read that record back out as the user with our custom auth'n 535 final Configuration clientConf = new Configuration(CONF); 536 // This test does not work with master registry in branch-2 because of a nuance in the non-async 537 // connection implementation. See the detail below. 538 clientConf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 539 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 540 clientConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); 541 UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]); 542 user1.addToken(createPasswordToken("user1", "definitely not the password", clusterId)); 543 user1.doAs(new PrivilegedExceptionAction<Void>() { 544 @Override 545 public Void run() throws Exception { 546 // There is a slight behavioral difference here in the 3.x vs 2.x branches. 3.x branches 547 // use async client connection implementation which throws if there is an exception when 548 // fetching the clusterId(). 2.x branches that use non-async client falls back to using a 549 // DEFAULT cluster ID in such cases. 3.x behavior makes more sense, especially if the 550 // exception is of type InvalidToken (digest mis-match), however I did not want to fix it 551 // since it makes sense only when master registry is in use (which has RPCs to master). 552 // That is the reason if you see a slight difference in the test between 3.x and 2.x. 553 try (Connection conn = ConnectionFactory.createConnection(clientConf); 554 Table t = conn.getTable(tableName)) { 555 t.get(new Get(Bytes.toBytes("r1"))); 556 fail("Should not successfully authenticate with HBase"); 557 } catch (RetriesExhaustedException re) { 558 assertTrue(re.getMessage(), re.getMessage().contains("SaslException")); 559 } catch (Exception e) { 560 // Any other exception is unexpected. 561 fail("Unexpected exception caught, was expecting a authentication error: " 562 + Throwables.getStackTraceAsString(e)); 563 } 564 return null; 565 } 566 }); 567 } 568}