001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.provider;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.ByteArrayInputStream;
027import java.io.DataInput;
028import java.io.DataInputStream;
029import java.io.DataOutput;
030import java.io.File;
031import java.io.IOException;
032import java.net.InetAddress;
033import java.security.PrivilegedExceptionAction;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.List;
037import java.util.Map;
038import java.util.Optional;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.stream.Collectors;
041import javax.security.auth.callback.Callback;
042import javax.security.auth.callback.CallbackHandler;
043import javax.security.auth.callback.NameCallback;
044import javax.security.auth.callback.PasswordCallback;
045import javax.security.auth.callback.UnsupportedCallbackException;
046import javax.security.sasl.AuthorizeCallback;
047import javax.security.sasl.RealmCallback;
048import javax.security.sasl.RealmChoiceCallback;
049import javax.security.sasl.Sasl;
050import javax.security.sasl.SaslClient;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.Cell;
054import org.apache.hadoop.hbase.CellUtil;
055import org.apache.hadoop.hbase.HBaseTestingUtility;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.LocalHBaseCluster;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.TableNameTestRule;
060import org.apache.hadoop.hbase.client.Admin;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
062import org.apache.hadoop.hbase.client.Connection;
063import org.apache.hadoop.hbase.client.ConnectionFactory;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.client.RetriesExhaustedException;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
070import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
071import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
072import org.apache.hadoop.hbase.ipc.NettyRpcClient;
073import org.apache.hadoop.hbase.ipc.RpcClientFactory;
074import org.apache.hadoop.hbase.ipc.RpcServerFactory;
075import org.apache.hadoop.hbase.security.AccessDeniedException;
076import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
077import org.apache.hadoop.hbase.security.SaslUtil;
078import org.apache.hadoop.hbase.security.SecurityInfo;
079import org.apache.hadoop.hbase.security.User;
080import org.apache.hadoop.hbase.security.token.SecureTestCluster;
081import org.apache.hadoop.hbase.security.token.TokenProvider;
082import org.apache.hadoop.hbase.util.Bytes;
083import org.apache.hadoop.hbase.util.CommonFSUtils;
084import org.apache.hadoop.hbase.util.Pair;
085import org.apache.hadoop.io.Text;
086import org.apache.hadoop.io.WritableUtils;
087import org.apache.hadoop.minikdc.MiniKdc;
088import org.apache.hadoop.security.UserGroupInformation;
089import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
090import org.apache.hadoop.security.token.SecretManager;
091import org.apache.hadoop.security.token.SecretManager.InvalidToken;
092import org.apache.hadoop.security.token.Token;
093import org.apache.hadoop.security.token.TokenIdentifier;
094import org.junit.After;
095import org.junit.AfterClass;
096import org.junit.Before;
097import org.junit.Rule;
098import org.junit.Test;
099import org.junit.runners.Parameterized.Parameter;
100import org.junit.runners.Parameterized.Parameters;
101import org.slf4j.Logger;
102import org.slf4j.LoggerFactory;
103
104import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
105
106import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
107
108/**
109 * Tests the pluggable authentication framework with SASL using a contrived authentication system.
110 * This tests holds a "user database" in memory as a hashmap. Clients provide their password in the
111 * client Hadoop configuration. The servers validate this password via the "user database".
112 */
113public abstract class CustomSaslAuthenticationProviderTestBase {
114
115  private static final Logger LOG =
116    LoggerFactory.getLogger(CustomSaslAuthenticationProviderTestBase.class);
117
118  private static final Map<String, String> USER_DATABASE = createUserDatabase();
119
120  private static final String USER1_PASSWORD = "foobarbaz";
121  private static final String USER2_PASSWORD = "bazbarfoo";
122
123  @Parameters
124  public static Collection<Object[]> parameters() {
125    return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() },
126      new Object[] { NettyRpcClient.class.getName() });
127  }
128
129  @Parameter
130  public String rpcClientImpl;
131
132  private static Map<String, String> createUserDatabase() {
133    Map<String, String> db = new ConcurrentHashMap<>();
134    db.put("user1", USER1_PASSWORD);
135    db.put("user2", USER2_PASSWORD);
136    return db;
137  }
138
139  public static String getPassword(String user) {
140    String password = USER_DATABASE.get(user);
141    if (password == null) {
142      throw new IllegalStateException("Cannot request password for a user that doesn't exist");
143    }
144    return password;
145  }
146
147  /**
148   * A custom token identifier for our custom auth'n method. Unique from the TokenIdentifier used
149   * for delegation tokens.
150   */
151  public static class PasswordAuthTokenIdentifier extends TokenIdentifier {
152    public static final Text PASSWORD_AUTH_TOKEN = new Text("HBASE_PASSWORD_TEST_TOKEN");
153    private String username;
154
155    public PasswordAuthTokenIdentifier() {
156    }
157
158    public PasswordAuthTokenIdentifier(String username) {
159      this.username = username;
160    }
161
162    @Override
163    public void readFields(DataInput in) throws IOException {
164      this.username = WritableUtils.readString(in);
165    }
166
167    @Override
168    public void write(DataOutput out) throws IOException {
169      WritableUtils.writeString(out, username);
170    }
171
172    @Override
173    public Text getKind() {
174      return PASSWORD_AUTH_TOKEN;
175    }
176
177    @Override
178    public UserGroupInformation getUser() {
179      if (username == null || "".equals(username)) {
180        return null;
181      }
182      return UserGroupInformation.createRemoteUser(username);
183    }
184  }
185
186  public static Token<? extends TokenIdentifier> createPasswordToken(String username,
187    String password, String clusterId) {
188    PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(username);
189    Token<? extends TokenIdentifier> token =
190      new Token<>(id.getBytes(), Bytes.toBytes(password), id.getKind(), new Text(clusterId));
191    return token;
192  }
193
194  /**
195   * Client provider that finds custom Token in the user's UGI and authenticates with the server via
196   * DIGEST-MD5 using that password.
197   */
198  public static class InMemoryClientProvider extends AbstractSaslClientAuthenticationProvider {
199    public static final String MECHANISM = "DIGEST-MD5";
200    public static final SaslAuthMethod SASL_AUTH_METHOD =
201      new SaslAuthMethod("IN_MEMORY", (byte) 42, MECHANISM, AuthenticationMethod.TOKEN);
202
203    @Override
204    public SaslClient createClient(Configuration conf, InetAddress serverAddr,
205      SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
206      Map<String, String> saslProps) throws IOException {
207      return Sasl.createSaslClient(new String[] { MECHANISM }, null, null,
208        SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryClientProviderCallbackHandler(token));
209    }
210
211    public Optional<Token<? extends TokenIdentifier>> findToken(User user) {
212      List<Token<? extends TokenIdentifier>> tokens = user.getTokens().stream()
213        .filter((token) -> token.getKind().equals(PasswordAuthTokenIdentifier.PASSWORD_AUTH_TOKEN))
214        .collect(Collectors.toList());
215      if (tokens.isEmpty()) {
216        return Optional.empty();
217      }
218      if (tokens.size() > 1) {
219        throw new IllegalStateException("Cannot handle more than one PasswordAuthToken");
220      }
221      return Optional.of(tokens.get(0));
222    }
223
224    @Override
225    public SaslAuthMethod getSaslAuthMethod() {
226      return SASL_AUTH_METHOD;
227    }
228
229    /**
230     * Sasl CallbackHandler which extracts information from our custom token and places it into the
231     * Sasl objects.
232     */
233    public class InMemoryClientProviderCallbackHandler implements CallbackHandler {
234      private final Token<? extends TokenIdentifier> token;
235
236      public InMemoryClientProviderCallbackHandler(Token<? extends TokenIdentifier> token) {
237        this.token = token;
238      }
239
240      @Override
241      public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
242        NameCallback nc = null;
243        PasswordCallback pc = null;
244        RealmCallback rc = null;
245        for (Callback callback : callbacks) {
246          if (callback instanceof RealmChoiceCallback) {
247            continue;
248          } else if (callback instanceof NameCallback) {
249            nc = (NameCallback) callback;
250          } else if (callback instanceof PasswordCallback) {
251            pc = (PasswordCallback) callback;
252          } else if (callback instanceof RealmCallback) {
253            rc = (RealmCallback) callback;
254          } else {
255            throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
256          }
257        }
258        if (nc != null) {
259          nc.setName(SaslUtil.encodeIdentifier(token.getIdentifier()));
260        }
261        if (pc != null) {
262          pc.setPassword(SaslUtil.encodePassword(token.getPassword()));
263        }
264        if (rc != null) {
265          rc.setText(rc.getDefaultText());
266        }
267      }
268    }
269
270    @Override
271    public UserInformation getUserInfo(User user) {
272      return null;
273    }
274  }
275
276  /**
277   * Server provider which validates credentials from an in-memory database.
278   */
279  public static class InMemoryServerProvider extends InMemoryClientProvider
280    implements SaslServerAuthenticationProvider {
281
282    @Override
283    public AttemptingUserProvidingSaslServer
284      createServer(SecretManager<TokenIdentifier> secretManager, Map<String, String> saslProps)
285        throws IOException {
286      return new AttemptingUserProvidingSaslServer(
287        Sasl.createSaslServer(getSaslAuthMethod().getSaslMechanism(), null,
288          SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryServerProviderCallbackHandler()),
289        () -> null);
290    }
291
292    /**
293     * Pulls the correct password for the user who started the SASL handshake so that SASL can
294     * validate that the user provided the right password.
295     */
296    private class InMemoryServerProviderCallbackHandler implements CallbackHandler {
297
298      @Override
299      public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException {
300        NameCallback nc = null;
301        PasswordCallback pc = null;
302        AuthorizeCallback ac = null;
303        for (Callback callback : callbacks) {
304          if (callback instanceof AuthorizeCallback) {
305            ac = (AuthorizeCallback) callback;
306          } else if (callback instanceof NameCallback) {
307            nc = (NameCallback) callback;
308          } else if (callback instanceof PasswordCallback) {
309            pc = (PasswordCallback) callback;
310          } else if (callback instanceof RealmCallback) {
311            continue; // realm is ignored
312          } else {
313            throw new UnsupportedCallbackException(callback, "Unrecognized SASL Callback");
314          }
315        }
316        if (nc != null && pc != null) {
317          byte[] encodedId = SaslUtil.decodeIdentifier(nc.getDefaultName());
318          PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier();
319          try {
320            id.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
321          } catch (IOException e) {
322            throw (InvalidToken) new InvalidToken("Can't de-serialize tokenIdentifier")
323              .initCause(e);
324          }
325          char[] actualPassword =
326            SaslUtil.encodePassword(Bytes.toBytes(getPassword(id.getUser().getUserName())));
327          pc.setPassword(actualPassword);
328        }
329        if (ac != null) {
330          String authid = ac.getAuthenticationID();
331          String authzid = ac.getAuthorizationID();
332          if (authid.equals(authzid)) {
333            ac.setAuthorized(true);
334          } else {
335            ac.setAuthorized(false);
336          }
337          if (ac.isAuthorized()) {
338            ac.setAuthorizedID(authzid);
339          }
340        }
341      }
342    }
343
344    @Override
345    public boolean supportsProtocolAuthentication() {
346      return false;
347    }
348
349    @Override
350    public UserGroupInformation getAuthorizedUgi(String authzId,
351      SecretManager<TokenIdentifier> secretManager) throws IOException {
352      UserGroupInformation authorizedUgi;
353      byte[] encodedId = SaslUtil.decodeIdentifier(authzId);
354      PasswordAuthTokenIdentifier tokenId = new PasswordAuthTokenIdentifier();
355      try {
356        tokenId.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
357      } catch (IOException e) {
358        throw new IOException("Can't de-serialize PasswordAuthTokenIdentifier", e);
359      }
360      authorizedUgi = tokenId.getUser();
361      if (authorizedUgi == null) {
362        throw new AccessDeniedException("Can't retrieve username from tokenIdentifier.");
363      }
364      authorizedUgi.addTokenIdentifier(tokenId);
365      authorizedUgi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod());
366      return authorizedUgi;
367    }
368  }
369
370  /**
371   * Custom provider which can select our custom provider, amongst other tokens which may be
372   * available.
373   */
374  public static class InMemoryProviderSelector extends BuiltInProviderSelector {
375    private InMemoryClientProvider inMemoryProvider;
376
377    @Override
378    public void configure(Configuration conf,
379      Collection<SaslClientAuthenticationProvider> providers) {
380      super.configure(conf, providers);
381      Optional<SaslClientAuthenticationProvider> o =
382        providers.stream().filter((p) -> p instanceof InMemoryClientProvider).findAny();
383
384      inMemoryProvider = (InMemoryClientProvider) o.orElseThrow(() -> new RuntimeException(
385        "InMemoryClientProvider not found in available providers: " + providers));
386    }
387
388    @Override
389    public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>>
390      selectProvider(String clusterId, User user) {
391      Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> superPair =
392        super.selectProvider(clusterId, user);
393
394      Optional<Token<? extends TokenIdentifier>> optional = inMemoryProvider.findToken(user);
395      if (optional.isPresent()) {
396        LOG.info("Using InMemoryClientProvider");
397        return new Pair<>(inMemoryProvider, optional.get());
398      }
399
400      LOG.info("InMemoryClientProvider not usable, falling back to {}", superPair);
401      return superPair;
402    }
403  }
404
405  private static void createBaseCluster(HBaseTestingUtility util, File keytabFile, MiniKdc kdc)
406    throws Exception {
407    String servicePrincipal = "hbase/localhost";
408    String spnegoPrincipal = "HTTP/localhost";
409    kdc.createPrincipal(keytabFile, servicePrincipal);
410    util.startMiniZKCluster();
411
412    HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(),
413      servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm());
414    HBaseKerberosUtils.setSSLConfiguration(util, SecureTestCluster.class);
415
416    util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
417      TokenProvider.class.getName());
418    util.startMiniDFSCluster(1);
419    Path rootdir = util.getDataTestDirOnTestFS("TestCustomSaslAuthenticationProvider");
420    CommonFSUtils.setRootDir(util.getConfiguration(), rootdir);
421  }
422
423  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
424  private static final Configuration CONF = UTIL.getConfiguration();
425  private static LocalHBaseCluster CLUSTER;
426  private static File KEYTAB_FILE;
427
428  protected static void startCluster(String rpcServerImpl) throws Exception {
429    KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
430    final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE);
431
432    // Adds our test impls instead of creating service loader entries which
433    // might inadvertently get them loaded on a real cluster.
434    CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY,
435      InMemoryClientProvider.class.getName());
436    CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY,
437      InMemoryServerProvider.class.getName());
438    CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY,
439      InMemoryProviderSelector.class.getName());
440    CONF.setLong(HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN, 600);
441    createBaseCluster(UTIL, KEYTAB_FILE, kdc);
442    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
443    CLUSTER = new LocalHBaseCluster(CONF, 1);
444    CLUSTER.startup();
445  }
446
447  @AfterClass
448  public static void shutdownCluster() throws Exception {
449    if (CLUSTER != null) {
450      CLUSTER.shutdown();
451      CLUSTER = null;
452    }
453    UTIL.shutdownMiniDFSCluster();
454    UTIL.shutdownMiniZKCluster();
455    UTIL.cleanupTestDir();
456  }
457
458  @Before
459  public void setUp() throws Exception {
460    createTable();
461  }
462
463  @After
464  public void tearDown() throws IOException {
465    UTIL.deleteTable(name.getTableName());
466  }
467
468  @Rule
469  public TableNameTestRule name = new TableNameTestRule();
470
471  private TableName tableName;
472
473  private String clusterId;
474
475  private void createTable() throws Exception {
476    tableName = name.getTableName();
477
478    // Create a table and write a record as the service user (hbase)
479    UserGroupInformation serviceUgi = UserGroupInformation
480      .loginUserFromKeytabAndReturnUGI("hbase/localhost", KEYTAB_FILE.getAbsolutePath());
481    clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() {
482      @Override
483      public String run() throws Exception {
484        try (Connection conn = ConnectionFactory.createConnection(CONF);
485          Admin admin = conn.getAdmin();) {
486          admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
487            .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1")).build());
488
489          UTIL.waitTableAvailable(tableName);
490
491          try (Table t = conn.getTable(tableName)) {
492            Put p = new Put(Bytes.toBytes("r1"));
493            p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
494            t.put(p);
495          }
496
497          return admin.getClusterMetrics().getClusterId();
498        }
499      }
500    });
501    assertNotNull(clusterId);
502  }
503
504  private Configuration getClientConf() {
505    Configuration conf = new Configuration(CONF);
506    conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
507    return conf;
508  }
509
510  @Test
511  public void testPositiveAuthentication() throws Exception {
512    // Validate that we can read that record back out as the user with our custom auth'n
513    UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]);
514    user1.addToken(createPasswordToken("user1", USER1_PASSWORD, clusterId));
515    user1.doAs(new PrivilegedExceptionAction<Void>() {
516      @Override
517      public Void run() throws Exception {
518        try (Connection conn = ConnectionFactory.createConnection(getClientConf());
519          Table t = conn.getTable(tableName)) {
520          Result r = t.get(new Get(Bytes.toBytes("r1")));
521          assertNotNull(r);
522          assertFalse("Should have read a non-empty Result", r.isEmpty());
523          final Cell cell = r.getColumnLatestCell(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
524          assertTrue("Unexpected value", CellUtil.matchingValue(cell, Bytes.toBytes("1")));
525
526          return null;
527        }
528      }
529    });
530  }
531
532  @Test
533  public void testNegativeAuthentication() throws Exception {
534    // Validate that we can read that record back out as the user with our custom auth'n
535    final Configuration clientConf = new Configuration(CONF);
536    // This test does not work with master registry in branch-2 because of a nuance in the non-async
537    // connection implementation. See the detail below.
538    clientConf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
539      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
540    clientConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
541    UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]);
542    user1.addToken(createPasswordToken("user1", "definitely not the password", clusterId));
543    user1.doAs(new PrivilegedExceptionAction<Void>() {
544      @Override
545      public Void run() throws Exception {
546        // There is a slight behavioral difference here in the 3.x vs 2.x branches. 3.x branches
547        // use async client connection implementation which throws if there is an exception when
548        // fetching the clusterId(). 2.x branches that use non-async client falls back to using a
549        // DEFAULT cluster ID in such cases. 3.x behavior makes more sense, especially if the
550        // exception is of type InvalidToken (digest mis-match), however I did not want to fix it
551        // since it makes sense only when master registry is in use (which has RPCs to master).
552        // That is the reason if you see a slight difference in the test between 3.x and 2.x.
553        try (Connection conn = ConnectionFactory.createConnection(clientConf);
554          Table t = conn.getTable(tableName)) {
555          t.get(new Get(Bytes.toBytes("r1")));
556          fail("Should not successfully authenticate with HBase");
557        } catch (RetriesExhaustedException re) {
558          assertTrue(re.getMessage(), re.getMessage().contains("SaslException"));
559        } catch (Exception e) {
560          // Any other exception is unexpected.
561          fail("Unexpected exception caught, was expecting a authentication error: "
562            + Throwables.getStackTraceAsString(e));
563        }
564        return null;
565      }
566    });
567  }
568}