001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.provider;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.ByteArrayInputStream;
026import java.io.DataInput;
027import java.io.DataInputStream;
028import java.io.DataOutput;
029import java.io.File;
030import java.io.IOException;
031import java.net.InetAddress;
032import java.security.PrivilegedExceptionAction;
033import java.util.Arrays;
034import java.util.Collection;
035import java.util.List;
036import java.util.Map;
037import java.util.Optional;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.stream.Collectors;
040import javax.security.auth.callback.Callback;
041import javax.security.auth.callback.CallbackHandler;
042import javax.security.auth.callback.NameCallback;
043import javax.security.auth.callback.PasswordCallback;
044import javax.security.auth.callback.UnsupportedCallbackException;
045import javax.security.sasl.AuthorizeCallback;
046import javax.security.sasl.RealmCallback;
047import javax.security.sasl.RealmChoiceCallback;
048import javax.security.sasl.Sasl;
049import javax.security.sasl.SaslClient;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.hbase.Cell;
053import org.apache.hadoop.hbase.CellUtil;
054import org.apache.hadoop.hbase.HBaseTestingUtil;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.LocalHBaseCluster;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.TableNameTestRule;
059import org.apache.hadoop.hbase.client.Admin;
060import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
061import org.apache.hadoop.hbase.client.Connection;
062import org.apache.hadoop.hbase.client.ConnectionFactory;
063import org.apache.hadoop.hbase.client.Get;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.Result;
066import org.apache.hadoop.hbase.client.RetriesExhaustedException;
067import org.apache.hadoop.hbase.client.Table;
068import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
069import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
070import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
071import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
072import org.apache.hadoop.hbase.ipc.NettyRpcClient;
073import org.apache.hadoop.hbase.ipc.RpcClientFactory;
074import org.apache.hadoop.hbase.ipc.RpcServerFactory;
075import org.apache.hadoop.hbase.security.AccessDeniedException;
076import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
077import org.apache.hadoop.hbase.security.SaslUtil;
078import org.apache.hadoop.hbase.security.User;
079import org.apache.hadoop.hbase.security.token.SecureTestCluster;
080import org.apache.hadoop.hbase.security.token.TokenProvider;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.hbase.util.CommonFSUtils;
083import org.apache.hadoop.hbase.util.Pair;
084import org.apache.hadoop.io.Text;
085import org.apache.hadoop.io.WritableUtils;
086import org.apache.hadoop.minikdc.MiniKdc;
087import org.apache.hadoop.security.UserGroupInformation;
088import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
089import org.apache.hadoop.security.token.SecretManager;
090import org.apache.hadoop.security.token.SecretManager.InvalidToken;
091import org.apache.hadoop.security.token.Token;
092import org.apache.hadoop.security.token.TokenIdentifier;
093import org.junit.After;
094import org.junit.AfterClass;
095import org.junit.Before;
096import org.junit.Rule;
097import org.junit.Test;
098import org.junit.runners.Parameterized.Parameter;
099import org.junit.runners.Parameterized.Parameters;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
104
105import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
106
107/**
108 * Tests the pluggable authentication framework with SASL using a contrived authentication system.
109 * This tests holds a "user database" in memory as a hashmap. Clients provide their password in the
110 * client Hadoop configuration. The servers validate this password via the "user database".
111 */
112public abstract class CustomSaslAuthenticationProviderTestBase {
113
114  private static final Logger LOG =
115    LoggerFactory.getLogger(CustomSaslAuthenticationProviderTestBase.class);
116
117  private static final Map<String, String> USER_DATABASE = createUserDatabase();
118
119  private static final String USER1_PASSWORD = "foobarbaz";
120  private static final String USER2_PASSWORD = "bazbarfoo";
121
122  @Parameters
123  public static Collection<Object[]> parameters() {
124    return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() },
125      new Object[] { NettyRpcClient.class.getName() });
126  }
127
128  @Parameter
129  public String rpcClientImpl;
130
131  private static Map<String, String> createUserDatabase() {
132    Map<String, String> db = new ConcurrentHashMap<>();
133    db.put("user1", USER1_PASSWORD);
134    db.put("user2", USER2_PASSWORD);
135    return db;
136  }
137
138  public static String getPassword(String user) {
139    String password = USER_DATABASE.get(user);
140    if (password == null) {
141      throw new IllegalStateException("Cannot request password for a user that doesn't exist");
142    }
143    return password;
144  }
145
146  /**
147   * A custom token identifier for our custom auth'n method. Unique from the TokenIdentifier used
148   * for delegation tokens.
149   */
150  public static class PasswordAuthTokenIdentifier extends TokenIdentifier {
151    public static final Text PASSWORD_AUTH_TOKEN = new Text("HBASE_PASSWORD_TEST_TOKEN");
152    private String username;
153
154    public PasswordAuthTokenIdentifier() {
155    }
156
157    public PasswordAuthTokenIdentifier(String username) {
158      this.username = username;
159    }
160
161    @Override
162    public void readFields(DataInput in) throws IOException {
163      this.username = WritableUtils.readString(in);
164    }
165
166    @Override
167    public void write(DataOutput out) throws IOException {
168      WritableUtils.writeString(out, username);
169    }
170
171    @Override
172    public Text getKind() {
173      return PASSWORD_AUTH_TOKEN;
174    }
175
176    @Override
177    public UserGroupInformation getUser() {
178      if (username == null || "".equals(username)) {
179        return null;
180      }
181      return UserGroupInformation.createRemoteUser(username);
182    }
183  }
184
185  public static Token<? extends TokenIdentifier> createPasswordToken(String username,
186    String password, String clusterId) {
187    PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(username);
188    Token<? extends TokenIdentifier> token =
189      new Token<>(id.getBytes(), Bytes.toBytes(password), id.getKind(), new Text(clusterId));
190    return token;
191  }
192
193  /**
194   * Client provider that finds custom Token in the user's UGI and authenticates with the server via
195   * DIGEST-MD5 using that password.
196   */
197  public static class InMemoryClientProvider extends AbstractSaslClientAuthenticationProvider {
198    public static final String MECHANISM = "DIGEST-MD5";
199    public static final SaslAuthMethod SASL_AUTH_METHOD =
200      new SaslAuthMethod("IN_MEMORY", (byte) 42, MECHANISM, AuthenticationMethod.TOKEN);
201
202    @Override
203    public SaslClient createClient(Configuration conf, InetAddress serverAddr,
204      String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
205      Map<String, String> saslProps) throws IOException {
206      return Sasl.createSaslClient(new String[] { MECHANISM }, null, null,
207        SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryClientProviderCallbackHandler(token));
208    }
209
210    public Optional<Token<? extends TokenIdentifier>> findToken(User user) {
211      List<Token<? extends TokenIdentifier>> tokens = user.getTokens().stream()
212        .filter((token) -> token.getKind().equals(PasswordAuthTokenIdentifier.PASSWORD_AUTH_TOKEN))
213        .collect(Collectors.toList());
214      if (tokens.isEmpty()) {
215        return Optional.empty();
216      }
217      if (tokens.size() > 1) {
218        throw new IllegalStateException("Cannot handle more than one PasswordAuthToken");
219      }
220      return Optional.of(tokens.get(0));
221    }
222
223    @Override
224    public SaslAuthMethod getSaslAuthMethod() {
225      return SASL_AUTH_METHOD;
226    }
227
228    /**
229     * Sasl CallbackHandler which extracts information from our custom token and places it into the
230     * Sasl objects.
231     */
232    public class InMemoryClientProviderCallbackHandler implements CallbackHandler {
233      private final Token<? extends TokenIdentifier> token;
234
235      public InMemoryClientProviderCallbackHandler(Token<? extends TokenIdentifier> token) {
236        this.token = token;
237      }
238
239      @Override
240      public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
241        NameCallback nc = null;
242        PasswordCallback pc = null;
243        RealmCallback rc = null;
244        for (Callback callback : callbacks) {
245          if (callback instanceof RealmChoiceCallback) {
246            continue;
247          } else if (callback instanceof NameCallback) {
248            nc = (NameCallback) callback;
249          } else if (callback instanceof PasswordCallback) {
250            pc = (PasswordCallback) callback;
251          } else if (callback instanceof RealmCallback) {
252            rc = (RealmCallback) callback;
253          } else {
254            throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
255          }
256        }
257        if (nc != null) {
258          nc.setName(SaslUtil.encodeIdentifier(token.getIdentifier()));
259        }
260        if (pc != null) {
261          pc.setPassword(SaslUtil.encodePassword(token.getPassword()));
262        }
263        if (rc != null) {
264          rc.setText(rc.getDefaultText());
265        }
266      }
267    }
268
269    @Override
270    public UserInformation getUserInfo(User user) {
271      return null;
272    }
273  }
274
275  /**
276   * Server provider which validates credentials from an in-memory database.
277   */
278  public static class InMemoryServerProvider extends InMemoryClientProvider
279    implements SaslServerAuthenticationProvider {
280
281    @Override
282    public AttemptingUserProvidingSaslServer
283      createServer(SecretManager<TokenIdentifier> secretManager, Map<String, String> saslProps)
284        throws IOException {
285      return new AttemptingUserProvidingSaslServer(
286        Sasl.createSaslServer(getSaslAuthMethod().getSaslMechanism(), null,
287          SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryServerProviderCallbackHandler()),
288        () -> null);
289    }
290
291    /**
292     * Pulls the correct password for the user who started the SASL handshake so that SASL can
293     * validate that the user provided the right password.
294     */
295    private class InMemoryServerProviderCallbackHandler implements CallbackHandler {
296
297      @Override
298      public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException {
299        NameCallback nc = null;
300        PasswordCallback pc = null;
301        AuthorizeCallback ac = null;
302        for (Callback callback : callbacks) {
303          if (callback instanceof AuthorizeCallback) {
304            ac = (AuthorizeCallback) callback;
305          } else if (callback instanceof NameCallback) {
306            nc = (NameCallback) callback;
307          } else if (callback instanceof PasswordCallback) {
308            pc = (PasswordCallback) callback;
309          } else if (callback instanceof RealmCallback) {
310            continue; // realm is ignored
311          } else {
312            throw new UnsupportedCallbackException(callback, "Unrecognized SASL Callback");
313          }
314        }
315        if (nc != null && pc != null) {
316          byte[] encodedId = SaslUtil.decodeIdentifier(nc.getDefaultName());
317          PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier();
318          try {
319            id.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
320          } catch (IOException e) {
321            throw (InvalidToken) new InvalidToken("Can't de-serialize tokenIdentifier")
322              .initCause(e);
323          }
324          char[] actualPassword =
325            SaslUtil.encodePassword(Bytes.toBytes(getPassword(id.getUser().getUserName())));
326          pc.setPassword(actualPassword);
327        }
328        if (ac != null) {
329          String authid = ac.getAuthenticationID();
330          String authzid = ac.getAuthorizationID();
331          if (authid.equals(authzid)) {
332            ac.setAuthorized(true);
333          } else {
334            ac.setAuthorized(false);
335          }
336          if (ac.isAuthorized()) {
337            ac.setAuthorizedID(authzid);
338          }
339        }
340      }
341    }
342
343    @Override
344    public boolean supportsProtocolAuthentication() {
345      return false;
346    }
347
348    @Override
349    public UserGroupInformation getAuthorizedUgi(String authzId,
350      SecretManager<TokenIdentifier> secretManager) throws IOException {
351      UserGroupInformation authorizedUgi;
352      byte[] encodedId = SaslUtil.decodeIdentifier(authzId);
353      PasswordAuthTokenIdentifier tokenId = new PasswordAuthTokenIdentifier();
354      try {
355        tokenId.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
356      } catch (IOException e) {
357        throw new IOException("Can't de-serialize PasswordAuthTokenIdentifier", e);
358      }
359      authorizedUgi = tokenId.getUser();
360      if (authorizedUgi == null) {
361        throw new AccessDeniedException("Can't retrieve username from tokenIdentifier.");
362      }
363      authorizedUgi.addTokenIdentifier(tokenId);
364      authorizedUgi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod());
365      return authorizedUgi;
366    }
367  }
368
369  /**
370   * Custom provider which can select our custom provider, amongst other tokens which may be
371   * available.
372   */
373  public static class InMemoryProviderSelector extends BuiltInProviderSelector {
374    private InMemoryClientProvider inMemoryProvider;
375
376    @Override
377    public void configure(Configuration conf,
378      Collection<SaslClientAuthenticationProvider> providers) {
379      super.configure(conf, providers);
380      Optional<SaslClientAuthenticationProvider> o =
381        providers.stream().filter((p) -> p instanceof InMemoryClientProvider).findAny();
382
383      inMemoryProvider = (InMemoryClientProvider) o.orElseThrow(() -> new RuntimeException(
384        "InMemoryClientProvider not found in available providers: " + providers));
385    }
386
387    @Override
388    public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>>
389      selectProvider(String clusterId, User user) {
390      Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> superPair =
391        super.selectProvider(clusterId, user);
392
393      Optional<Token<? extends TokenIdentifier>> optional = inMemoryProvider.findToken(user);
394      if (optional.isPresent()) {
395        LOG.info("Using InMemoryClientProvider");
396        return new Pair<>(inMemoryProvider, optional.get());
397      }
398
399      LOG.info("InMemoryClientProvider not usable, falling back to {}", superPair);
400      return superPair;
401    }
402  }
403
404  private static void createBaseCluster(HBaseTestingUtil util, File keytabFile, MiniKdc kdc)
405    throws Exception {
406    String servicePrincipal = "hbase/localhost";
407    String spnegoPrincipal = "HTTP/localhost";
408    kdc.createPrincipal(keytabFile, servicePrincipal);
409    util.startMiniZKCluster();
410
411    HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(),
412      servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm());
413    HBaseKerberosUtils.setSSLConfiguration(util, SecureTestCluster.class);
414
415    util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
416      TokenProvider.class.getName());
417    util.startMiniDFSCluster(1);
418    Path rootdir = util.getDataTestDirOnTestFS("TestCustomSaslAuthenticationProvider");
419    CommonFSUtils.setRootDir(util.getConfiguration(), rootdir);
420  }
421
422  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
423  private static final Configuration CONF = UTIL.getConfiguration();
424  private static LocalHBaseCluster CLUSTER;
425  private static File KEYTAB_FILE;
426
427  protected static void startCluster(String rpcServerImpl) throws Exception {
428    KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
429    final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE);
430
431    // Adds our test impls instead of creating service loader entries which
432    // might inadvertently get them loaded on a real cluster.
433    CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY,
434      InMemoryClientProvider.class.getName());
435    CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY,
436      InMemoryServerProvider.class.getName());
437    CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY,
438      InMemoryProviderSelector.class.getName());
439    createBaseCluster(UTIL, KEYTAB_FILE, kdc);
440    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
441    CLUSTER = new LocalHBaseCluster(CONF, 1);
442    CLUSTER.startup();
443  }
444
445  @AfterClass
446  public static void shutdownCluster() throws Exception {
447    if (CLUSTER != null) {
448      CLUSTER.shutdown();
449      CLUSTER = null;
450    }
451    UTIL.shutdownMiniDFSCluster();
452    UTIL.shutdownMiniZKCluster();
453    UTIL.cleanupTestDir();
454  }
455
456  @Before
457  public void setUp() throws Exception {
458    createTable();
459  }
460
461  @After
462  public void tearDown() throws IOException {
463    UTIL.deleteTable(name.getTableName());
464  }
465
466  @Rule
467  public TableNameTestRule name = new TableNameTestRule();
468
469  private TableName tableName;
470
471  private String clusterId;
472
473  private void createTable() throws Exception {
474    tableName = name.getTableName();
475
476    // Create a table and write a record as the service user (hbase)
477    UserGroupInformation serviceUgi = UserGroupInformation
478      .loginUserFromKeytabAndReturnUGI("hbase/localhost", KEYTAB_FILE.getAbsolutePath());
479    clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() {
480      @Override
481      public String run() throws Exception {
482        try (Connection conn = ConnectionFactory.createConnection(CONF);
483          Admin admin = conn.getAdmin();) {
484          admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
485            .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1")).build());
486
487          UTIL.waitTableAvailable(tableName);
488
489          try (Table t = conn.getTable(tableName)) {
490            Put p = new Put(Bytes.toBytes("r1"));
491            p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
492            t.put(p);
493          }
494
495          return admin.getClusterMetrics().getClusterId();
496        }
497      }
498    });
499    assertNotNull(clusterId);
500  }
501
502  private Configuration getClientConf() {
503    Configuration conf = new Configuration(CONF);
504    conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
505    return conf;
506  }
507
508  @Test
509  public void testPositiveAuthentication() throws Exception {
510    // Validate that we can read that record back out as the user with our custom auth'n
511    UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]);
512    user1.addToken(createPasswordToken("user1", USER1_PASSWORD, clusterId));
513    user1.doAs(new PrivilegedExceptionAction<Void>() {
514      @Override
515      public Void run() throws Exception {
516        try (Connection conn = ConnectionFactory.createConnection(getClientConf());
517          Table t = conn.getTable(tableName)) {
518          Result r = t.get(new Get(Bytes.toBytes("r1")));
519          assertNotNull(r);
520          assertFalse("Should have read a non-empty Result", r.isEmpty());
521          final Cell cell = r.getColumnLatestCell(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
522          assertTrue("Unexpected value", CellUtil.matchingValue(cell, Bytes.toBytes("1")));
523
524          return null;
525        }
526      }
527    });
528  }
529
530  @Test
531  public void testNegativeAuthentication() throws Exception {
532    // Validate that we can read that record back out as the user with our custom auth'n
533    UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]);
534    user1.addToken(createPasswordToken("user1", "definitely not the password", clusterId));
535    user1.doAs(new PrivilegedExceptionAction<Void>() {
536      @Override
537      public Void run() throws Exception {
538        Configuration clientConf = getClientConf();
539        clientConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
540        // Depending on the registry in use, the following code can throw exceptions at different
541        // places. Master registry fails at the createConnection() step because the RPC to the
542        // master fails with sasl auth. With ZK registry, connection creation succeeds (since there
543        // is no RPC to HBase services involved) but the subsequent get() fails. The root cause
544        // should still be a SaslException in both the cases.
545        try (Connection conn = ConnectionFactory.createConnection(clientConf);
546          Table t = conn.getTable(tableName)) {
547          t.get(new Get(Bytes.toBytes("r1")));
548          fail("Should not successfully authenticate with HBase");
549        } catch (MasterRegistryFetchException mfe) {
550          Throwable cause = mfe.getCause();
551          assertTrue(cause.getMessage(), cause.getMessage().contains("SaslException"));
552        } catch (RetriesExhaustedException re) {
553          assertTrue(re.getMessage(), re.getMessage().contains("SaslException"));
554        } catch (Exception e) {
555          // Any other exception is unexpected.
556          fail("Unexpected exception caught, was expecting a authentication error: "
557            + Throwables.getStackTraceAsString(e));
558        }
559        return null;
560      }
561    });
562  }
563}