001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.provider;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.containsString;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025import static org.junit.jupiter.api.Assertions.fail;
026
027import java.io.ByteArrayInputStream;
028import java.io.DataInput;
029import java.io.DataInputStream;
030import java.io.DataOutput;
031import java.io.File;
032import java.io.IOException;
033import java.net.InetAddress;
034import java.security.PrivilegedExceptionAction;
035import java.util.Collection;
036import java.util.List;
037import java.util.Map;
038import java.util.Optional;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.stream.Collectors;
041import java.util.stream.Stream;
042import javax.security.auth.callback.Callback;
043import javax.security.auth.callback.CallbackHandler;
044import javax.security.auth.callback.NameCallback;
045import javax.security.auth.callback.PasswordCallback;
046import javax.security.auth.callback.UnsupportedCallbackException;
047import javax.security.sasl.AuthorizeCallback;
048import javax.security.sasl.RealmCallback;
049import javax.security.sasl.RealmChoiceCallback;
050import javax.security.sasl.Sasl;
051import javax.security.sasl.SaslClient;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.hbase.Cell;
055import org.apache.hadoop.hbase.CellUtil;
056import org.apache.hadoop.hbase.HBaseTestingUtil;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.LocalHBaseCluster;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.TableNameTestExtension;
061import org.apache.hadoop.hbase.client.Admin;
062import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
063import org.apache.hadoop.hbase.client.Connection;
064import org.apache.hadoop.hbase.client.ConnectionFactory;
065import org.apache.hadoop.hbase.client.Get;
066import org.apache.hadoop.hbase.client.Put;
067import org.apache.hadoop.hbase.client.Result;
068import org.apache.hadoop.hbase.client.RetriesExhaustedException;
069import org.apache.hadoop.hbase.client.Table;
070import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
071import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
072import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
073import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
074import org.apache.hadoop.hbase.ipc.NettyRpcClient;
075import org.apache.hadoop.hbase.ipc.RpcClientFactory;
076import org.apache.hadoop.hbase.ipc.RpcServerFactory;
077import org.apache.hadoop.hbase.security.AccessDeniedException;
078import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
079import org.apache.hadoop.hbase.security.SaslUtil;
080import org.apache.hadoop.hbase.security.User;
081import org.apache.hadoop.hbase.security.token.SecureTestCluster;
082import org.apache.hadoop.hbase.security.token.TokenProvider;
083import org.apache.hadoop.hbase.util.Bytes;
084import org.apache.hadoop.hbase.util.CommonFSUtils;
085import org.apache.hadoop.hbase.util.Pair;
086import org.apache.hadoop.io.Text;
087import org.apache.hadoop.io.WritableUtils;
088import org.apache.hadoop.minikdc.MiniKdc;
089import org.apache.hadoop.security.UserGroupInformation;
090import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
091import org.apache.hadoop.security.token.SecretManager;
092import org.apache.hadoop.security.token.SecretManager.InvalidToken;
093import org.apache.hadoop.security.token.Token;
094import org.apache.hadoop.security.token.TokenIdentifier;
095import org.junit.jupiter.api.AfterAll;
096import org.junit.jupiter.api.AfterEach;
097import org.junit.jupiter.api.BeforeEach;
098import org.junit.jupiter.api.TestTemplate;
099import org.junit.jupiter.api.extension.RegisterExtension;
100import org.junit.jupiter.params.provider.Arguments;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103
104import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
105
106import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
107
108/**
109 * Tests the pluggable authentication framework with SASL using a contrived authentication system.
110 * This tests holds a "user database" in memory as a hashmap. Clients provide their password in the
111 * client Hadoop configuration. The servers validate this password via the "user database".
112 */
113public abstract class CustomSaslAuthenticationProviderTestBase {
114
115  private static final Logger LOG =
116    LoggerFactory.getLogger(CustomSaslAuthenticationProviderTestBase.class);
117
118  private static final Map<String, String> USER_DATABASE = createUserDatabase();
119
120  private static final String USER1_PASSWORD = "foobarbaz";
121  private static final String USER2_PASSWORD = "bazbarfoo";
122
123  public static Stream<Arguments> parameters() {
124    return Stream.of(Arguments.of(BlockingRpcClient.class.getName()),
125      Arguments.of(NettyRpcClient.class.getName()));
126  }
127
128  private String rpcClientImpl;
129
130  public CustomSaslAuthenticationProviderTestBase(String rpcClientImpl) {
131    this.rpcClientImpl = rpcClientImpl;
132  }
133
134  private static Map<String, String> createUserDatabase() {
135    Map<String, String> db = new ConcurrentHashMap<>();
136    db.put("user1", USER1_PASSWORD);
137    db.put("user2", USER2_PASSWORD);
138    return db;
139  }
140
141  public static String getPassword(String user) {
142    String password = USER_DATABASE.get(user);
143    if (password == null) {
144      throw new IllegalStateException("Cannot request password for a user that doesn't exist");
145    }
146    return password;
147  }
148
149  /**
150   * A custom token identifier for our custom auth'n method. Unique from the TokenIdentifier used
151   * for delegation tokens.
152   */
153  public static class PasswordAuthTokenIdentifier extends TokenIdentifier {
154    public static final Text PASSWORD_AUTH_TOKEN = new Text("HBASE_PASSWORD_TEST_TOKEN");
155    private String username;
156
157    public PasswordAuthTokenIdentifier() {
158    }
159
160    public PasswordAuthTokenIdentifier(String username) {
161      this.username = username;
162    }
163
164    @Override
165    public void readFields(DataInput in) throws IOException {
166      this.username = WritableUtils.readString(in);
167    }
168
169    @Override
170    public void write(DataOutput out) throws IOException {
171      WritableUtils.writeString(out, username);
172    }
173
174    @Override
175    public Text getKind() {
176      return PASSWORD_AUTH_TOKEN;
177    }
178
179    @Override
180    public UserGroupInformation getUser() {
181      if (username == null || "".equals(username)) {
182        return null;
183      }
184      return UserGroupInformation.createRemoteUser(username);
185    }
186  }
187
188  public static Token<? extends TokenIdentifier> createPasswordToken(String username,
189    String password, String clusterId) {
190    PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(username);
191    Token<? extends TokenIdentifier> token =
192      new Token<>(id.getBytes(), Bytes.toBytes(password), id.getKind(), new Text(clusterId));
193    return token;
194  }
195
196  /**
197   * Client provider that finds custom Token in the user's UGI and authenticates with the server via
198   * DIGEST-MD5 using that password.
199   */
200  public static class InMemoryClientProvider extends AbstractSaslClientAuthenticationProvider {
201    public static final String MECHANISM = "DIGEST-MD5";
202    public static final SaslAuthMethod SASL_AUTH_METHOD =
203      new SaslAuthMethod("IN_MEMORY", (byte) 42, MECHANISM, AuthenticationMethod.TOKEN);
204
205    @Override
206    public SaslClient createClient(Configuration conf, InetAddress serverAddr,
207      String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
208      Map<String, String> saslProps) throws IOException {
209      return Sasl.createSaslClient(new String[] { MECHANISM }, null, null,
210        SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryClientProviderCallbackHandler(token));
211    }
212
213    public Optional<Token<? extends TokenIdentifier>> findToken(User user) {
214      List<Token<? extends TokenIdentifier>> tokens = user.getTokens().stream()
215        .filter((token) -> token.getKind().equals(PasswordAuthTokenIdentifier.PASSWORD_AUTH_TOKEN))
216        .collect(Collectors.toList());
217      if (tokens.isEmpty()) {
218        return Optional.empty();
219      }
220      if (tokens.size() > 1) {
221        throw new IllegalStateException("Cannot handle more than one PasswordAuthToken");
222      }
223      return Optional.of(tokens.get(0));
224    }
225
226    @Override
227    public SaslAuthMethod getSaslAuthMethod() {
228      return SASL_AUTH_METHOD;
229    }
230
231    /**
232     * Sasl CallbackHandler which extracts information from our custom token and places it into the
233     * Sasl objects.
234     */
235    public class InMemoryClientProviderCallbackHandler implements CallbackHandler {
236      private final Token<? extends TokenIdentifier> token;
237
238      public InMemoryClientProviderCallbackHandler(Token<? extends TokenIdentifier> token) {
239        this.token = token;
240      }
241
242      @Override
243      public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
244        NameCallback nc = null;
245        PasswordCallback pc = null;
246        RealmCallback rc = null;
247        for (Callback callback : callbacks) {
248          if (callback instanceof RealmChoiceCallback) {
249            continue;
250          } else if (callback instanceof NameCallback) {
251            nc = (NameCallback) callback;
252          } else if (callback instanceof PasswordCallback) {
253            pc = (PasswordCallback) callback;
254          } else if (callback instanceof RealmCallback) {
255            rc = (RealmCallback) callback;
256          } else {
257            throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
258          }
259        }
260        if (nc != null) {
261          nc.setName(SaslUtil.encodeIdentifier(token.getIdentifier()));
262        }
263        if (pc != null) {
264          pc.setPassword(SaslUtil.encodePassword(token.getPassword()));
265        }
266        if (rc != null) {
267          rc.setText(rc.getDefaultText());
268        }
269      }
270    }
271
272    @Override
273    public UserInformation getUserInfo(User user) {
274      return null;
275    }
276  }
277
278  /**
279   * Server provider which validates credentials from an in-memory database.
280   */
281  public static class InMemoryServerProvider extends InMemoryClientProvider
282    implements SaslServerAuthenticationProvider {
283
284    @Override
285    public AttemptingUserProvidingSaslServer
286      createServer(SecretManager<TokenIdentifier> secretManager, Map<String, String> saslProps)
287        throws IOException {
288      return new AttemptingUserProvidingSaslServer(
289        Sasl.createSaslServer(getSaslAuthMethod().getSaslMechanism(), null,
290          SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryServerProviderCallbackHandler()),
291        () -> null);
292    }
293
294    /**
295     * Pulls the correct password for the user who started the SASL handshake so that SASL can
296     * validate that the user provided the right password.
297     */
298    private class InMemoryServerProviderCallbackHandler implements CallbackHandler {
299
300      @Override
301      public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException {
302        NameCallback nc = null;
303        PasswordCallback pc = null;
304        AuthorizeCallback ac = null;
305        for (Callback callback : callbacks) {
306          if (callback instanceof AuthorizeCallback) {
307            ac = (AuthorizeCallback) callback;
308          } else if (callback instanceof NameCallback) {
309            nc = (NameCallback) callback;
310          } else if (callback instanceof PasswordCallback) {
311            pc = (PasswordCallback) callback;
312          } else if (callback instanceof RealmCallback) {
313            continue; // realm is ignored
314          } else {
315            throw new UnsupportedCallbackException(callback, "Unrecognized SASL Callback");
316          }
317        }
318        if (nc != null && pc != null) {
319          byte[] encodedId = SaslUtil.decodeIdentifier(nc.getDefaultName());
320          PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier();
321          try {
322            id.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
323          } catch (IOException e) {
324            throw (InvalidToken) new InvalidToken("Can't de-serialize tokenIdentifier")
325              .initCause(e);
326          }
327          char[] actualPassword =
328            SaslUtil.encodePassword(Bytes.toBytes(getPassword(id.getUser().getUserName())));
329          pc.setPassword(actualPassword);
330        }
331        if (ac != null) {
332          String authid = ac.getAuthenticationID();
333          String authzid = ac.getAuthorizationID();
334          if (authid.equals(authzid)) {
335            ac.setAuthorized(true);
336          } else {
337            ac.setAuthorized(false);
338          }
339          if (ac.isAuthorized()) {
340            ac.setAuthorizedID(authzid);
341          }
342        }
343      }
344    }
345
346    @Override
347    public boolean supportsProtocolAuthentication() {
348      return false;
349    }
350
351    @Override
352    public UserGroupInformation getAuthorizedUgi(String authzId,
353      SecretManager<TokenIdentifier> secretManager) throws IOException {
354      UserGroupInformation authorizedUgi;
355      byte[] encodedId = SaslUtil.decodeIdentifier(authzId);
356      PasswordAuthTokenIdentifier tokenId = new PasswordAuthTokenIdentifier();
357      try {
358        tokenId.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
359      } catch (IOException e) {
360        throw new IOException("Can't de-serialize PasswordAuthTokenIdentifier", e);
361      }
362      authorizedUgi = tokenId.getUser();
363      if (authorizedUgi == null) {
364        throw new AccessDeniedException("Can't retrieve username from tokenIdentifier.");
365      }
366      authorizedUgi.addTokenIdentifier(tokenId);
367      authorizedUgi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod());
368      return authorizedUgi;
369    }
370  }
371
372  /**
373   * Custom provider which can select our custom provider, amongst other tokens which may be
374   * available.
375   */
376  public static class InMemoryProviderSelector extends BuiltInProviderSelector {
377    private InMemoryClientProvider inMemoryProvider;
378
379    @Override
380    public void configure(Configuration conf,
381      Collection<SaslClientAuthenticationProvider> providers) {
382      super.configure(conf, providers);
383      Optional<SaslClientAuthenticationProvider> o =
384        providers.stream().filter((p) -> p instanceof InMemoryClientProvider).findAny();
385
386      inMemoryProvider = (InMemoryClientProvider) o.orElseThrow(() -> new RuntimeException(
387        "InMemoryClientProvider not found in available providers: " + providers));
388    }
389
390    @Override
391    public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>>
392      selectProvider(String clusterId, User user) {
393      Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> superPair =
394        super.selectProvider(clusterId, user);
395
396      Optional<Token<? extends TokenIdentifier>> optional = inMemoryProvider.findToken(user);
397      if (optional.isPresent()) {
398        LOG.info("Using InMemoryClientProvider");
399        return new Pair<>(inMemoryProvider, optional.get());
400      }
401
402      LOG.info("InMemoryClientProvider not usable, falling back to {}", superPair);
403      return superPair;
404    }
405  }
406
407  private static void createBaseCluster(HBaseTestingUtil util, File keytabFile, MiniKdc kdc)
408    throws Exception {
409    String servicePrincipal = "hbase/localhost";
410    String spnegoPrincipal = "HTTP/localhost";
411    kdc.createPrincipal(keytabFile, servicePrincipal);
412    util.startMiniZKCluster();
413
414    HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(),
415      servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm());
416    HBaseKerberosUtils.setSSLConfiguration(util, SecureTestCluster.class);
417
418    util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
419      TokenProvider.class.getName());
420    util.startMiniDFSCluster(1);
421    Path rootdir = util.getDataTestDirOnTestFS("TestCustomSaslAuthenticationProvider");
422    CommonFSUtils.setRootDir(util.getConfiguration(), rootdir);
423  }
424
425  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
426  private static final Configuration CONF = UTIL.getConfiguration();
427  private static LocalHBaseCluster CLUSTER;
428  private static File KEYTAB_FILE;
429
430  protected static void startCluster(String rpcServerImpl) throws Exception {
431    KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
432    final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE);
433
434    // Adds our test impls instead of creating service loader entries which
435    // might inadvertently get them loaded on a real cluster.
436    CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY,
437      InMemoryClientProvider.class.getName());
438    CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY,
439      InMemoryServerProvider.class.getName());
440    CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY,
441      InMemoryProviderSelector.class.getName());
442    createBaseCluster(UTIL, KEYTAB_FILE, kdc);
443    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
444    CLUSTER = new LocalHBaseCluster(CONF, 1);
445    CLUSTER.startup();
446  }
447
448  @AfterAll
449  public static void shutdownCluster() throws Exception {
450    if (CLUSTER != null) {
451      CLUSTER.shutdown();
452      CLUSTER = null;
453    }
454    UTIL.shutdownMiniDFSCluster();
455    UTIL.shutdownMiniZKCluster();
456    UTIL.cleanupTestDir();
457  }
458
459  @BeforeEach
460  public void setUp() throws Exception {
461    createTable();
462  }
463
464  @AfterEach
465  public void tearDown() throws IOException {
466    UTIL.deleteTable(name.getTableName());
467  }
468
469  @RegisterExtension
470  private final TableNameTestExtension name = new TableNameTestExtension();
471
472  private TableName tableName;
473
474  private String clusterId;
475
476  private void createTable() throws Exception {
477    tableName = name.getTableName();
478
479    // Create a table and write a record as the service user (hbase)
480    UserGroupInformation serviceUgi = UserGroupInformation
481      .loginUserFromKeytabAndReturnUGI("hbase/localhost", KEYTAB_FILE.getAbsolutePath());
482    clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() {
483      @Override
484      public String run() throws Exception {
485        try (Connection conn = ConnectionFactory.createConnection(CONF);
486          Admin admin = conn.getAdmin();) {
487          admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
488            .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1")).build());
489
490          UTIL.waitTableAvailable(tableName);
491
492          try (Table t = conn.getTable(tableName)) {
493            Put p = new Put(Bytes.toBytes("r1"));
494            p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
495            t.put(p);
496          }
497
498          return admin.getClusterMetrics().getClusterId();
499        }
500      }
501    });
502    assertNotNull(clusterId);
503  }
504
505  private Configuration getClientConf() {
506    Configuration conf = new Configuration(CONF);
507    conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
508    return conf;
509  }
510
511  @TestTemplate
512  public void testPositiveAuthentication() throws Exception {
513    // Validate that we can read that record back out as the user with our custom auth'n
514    UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]);
515    user1.addToken(createPasswordToken("user1", USER1_PASSWORD, clusterId));
516    user1.doAs(new PrivilegedExceptionAction<Void>() {
517      @Override
518      public Void run() throws Exception {
519        try (Connection conn = ConnectionFactory.createConnection(getClientConf());
520          Table t = conn.getTable(tableName)) {
521          Result r = t.get(new Get(Bytes.toBytes("r1")));
522          assertNotNull(r);
523          assertFalse(r.isEmpty(), "Should have read a non-empty Result");
524          final Cell cell = r.getColumnLatestCell(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
525          assertTrue(CellUtil.matchingValue(cell, Bytes.toBytes("1")), "Unexpected value");
526
527          return null;
528        }
529      }
530    });
531  }
532
533  @TestTemplate
534  public void testNegativeAuthentication() throws Exception {
535    // Validate that we can read that record back out as the user with our custom auth'n
536    UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]);
537    user1.addToken(createPasswordToken("user1", "definitely not the password", clusterId));
538    user1.doAs(new PrivilegedExceptionAction<Void>() {
539      @Override
540      public Void run() throws Exception {
541        Configuration clientConf = getClientConf();
542        clientConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
543        // Depending on the registry in use, the following code can throw exceptions at different
544        // places. Master registry fails at the createConnection() step because the RPC to the
545        // master fails with sasl auth. With ZK registry, connection creation succeeds (since there
546        // is no RPC to HBase services involved) but the subsequent get() fails. The root cause
547        // should still be a SaslException in both the cases.
548        try (Connection conn = ConnectionFactory.createConnection(clientConf);
549          Table t = conn.getTable(tableName)) {
550          t.get(new Get(Bytes.toBytes("r1")));
551          fail("Should not successfully authenticate with HBase");
552        } catch (MasterRegistryFetchException mfe) {
553          Throwable cause = mfe.getCause();
554          assertThat(cause.getMessage(), containsString("SaslException"));
555        } catch (RetriesExhaustedException re) {
556          assertThat(re.getMessage(), containsString("SaslException"));
557        } catch (Exception e) {
558          // Any other exception is unexpected.
559          fail("Unexpected exception caught, was expecting a authentication error: "
560            + Throwables.getStackTraceAsString(e));
561        }
562        return null;
563      }
564    });
565  }
566}