001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.provider;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.ByteArrayInputStream;
027import java.io.DataInput;
028import java.io.DataInputStream;
029import java.io.DataOutput;
030import java.io.File;
031import java.io.IOException;
032import java.net.InetAddress;
033import java.security.PrivilegedExceptionAction;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.List;
038import java.util.Map;
039import java.util.Optional;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.stream.Collectors;
042import javax.security.auth.callback.Callback;
043import javax.security.auth.callback.CallbackHandler;
044import javax.security.auth.callback.NameCallback;
045import javax.security.auth.callback.PasswordCallback;
046import javax.security.auth.callback.UnsupportedCallbackException;
047import javax.security.sasl.AuthorizeCallback;
048import javax.security.sasl.RealmCallback;
049import javax.security.sasl.RealmChoiceCallback;
050import javax.security.sasl.Sasl;
051import javax.security.sasl.SaslClient;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.hbase.Cell;
055import org.apache.hadoop.hbase.CellUtil;
056import org.apache.hadoop.hbase.HBaseClassTestRule;
057import org.apache.hadoop.hbase.HBaseTestingUtility;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.LocalHBaseCluster;
060import org.apache.hadoop.hbase.TableName;
061import org.apache.hadoop.hbase.TableNameTestRule;
062import org.apache.hadoop.hbase.client.Admin;
063import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
064import org.apache.hadoop.hbase.client.Connection;
065import org.apache.hadoop.hbase.client.ConnectionFactory;
066import org.apache.hadoop.hbase.client.Get;
067import org.apache.hadoop.hbase.client.Put;
068import org.apache.hadoop.hbase.client.Result;
069import org.apache.hadoop.hbase.client.RetriesExhaustedException;
070import org.apache.hadoop.hbase.client.Table;
071import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
072import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
073import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
074import org.apache.hadoop.hbase.ipc.NettyRpcClient;
075import org.apache.hadoop.hbase.ipc.NettyRpcServer;
076import org.apache.hadoop.hbase.ipc.RpcClientFactory;
077import org.apache.hadoop.hbase.ipc.RpcServerFactory;
078import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
079import org.apache.hadoop.hbase.security.AccessDeniedException;
080import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
081import org.apache.hadoop.hbase.security.SaslUtil;
082import org.apache.hadoop.hbase.security.SecurityInfo;
083import org.apache.hadoop.hbase.security.User;
084import org.apache.hadoop.hbase.security.token.SecureTestCluster;
085import org.apache.hadoop.hbase.security.token.TokenProvider;
086import org.apache.hadoop.hbase.testclassification.MediumTests;
087import org.apache.hadoop.hbase.testclassification.SecurityTests;
088import org.apache.hadoop.hbase.util.Bytes;
089import org.apache.hadoop.hbase.util.CommonFSUtils;
090import org.apache.hadoop.hbase.util.Pair;
091import org.apache.hadoop.io.Text;
092import org.apache.hadoop.io.WritableUtils;
093import org.apache.hadoop.minikdc.MiniKdc;
094import org.apache.hadoop.security.UserGroupInformation;
095import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
096import org.apache.hadoop.security.token.SecretManager;
097import org.apache.hadoop.security.token.SecretManager.InvalidToken;
098import org.apache.hadoop.security.token.Token;
099import org.apache.hadoop.security.token.TokenIdentifier;
100import org.junit.After;
101import org.junit.AfterClass;
102import org.junit.Before;
103import org.junit.BeforeClass;
104import org.junit.ClassRule;
105import org.junit.Rule;
106import org.junit.Test;
107import org.junit.experimental.categories.Category;
108import org.junit.runner.RunWith;
109import org.junit.runners.Parameterized;
110import org.slf4j.Logger;
111import org.slf4j.LoggerFactory;
112
113import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
114
115import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
116
117/**
118 * Tests the pluggable authentication framework with SASL using a contrived authentication system.
119 *
120 * This tests holds a "user database" in memory as a hashmap. Clients provide their password
121 * in the client Hadoop configuration. The servers validate this password via the "user database".
122 */
123@RunWith(Parameterized.class)
124@Category({MediumTests.class, SecurityTests.class})
125public class TestCustomSaslAuthenticationProvider {
126  private static final Logger LOG = LoggerFactory.getLogger(
127      TestCustomSaslAuthenticationProvider.class);
128
129  @ClassRule
130  public static final HBaseClassTestRule CLASS_RULE =
131      HBaseClassTestRule.forClass(TestCustomSaslAuthenticationProvider.class);
132
133  private static final Map<String,String> USER_DATABASE = createUserDatabase();
134
135  private static final String USER1_PASSWORD = "foobarbaz";
136  private static final String USER2_PASSWORD = "bazbarfoo";
137
138  @Parameterized.Parameters(name = "{index}: rpcClientImpl={0}, rpcServerImpl={1}")
139  public static Collection<Object[]> parameters() {
140    List<Object[]> params = new ArrayList<>();
141    List<String> rpcClientImpls = Arrays.asList(
142        BlockingRpcClient.class.getName(), NettyRpcClient.class.getName());
143    List<String> rpcServerImpls = Arrays.asList(
144        SimpleRpcServer.class.getName(), NettyRpcServer.class.getName());
145    for (String rpcClientImpl : rpcClientImpls) {
146      for (String rpcServerImpl : rpcServerImpls) {
147        params.add(new Object[] { rpcClientImpl, rpcServerImpl });
148      }
149    }
150    return params;
151  }
152
153  @Parameterized.Parameter(0)
154  public String rpcClientImpl;
155
156  @Parameterized.Parameter(1)
157  public String rpcServerImpl;
158
159  private static Map<String,String> createUserDatabase() {
160    Map<String,String> db = new ConcurrentHashMap<>();
161    db.put("user1", USER1_PASSWORD);
162    db.put("user2", USER2_PASSWORD);
163    return db;
164  }
165
166  public static String getPassword(String user) {
167    String password = USER_DATABASE.get(user);
168    if (password == null) {
169      throw new IllegalStateException("Cannot request password for a user that doesn't exist");
170    }
171    return password;
172  }
173
174  /**
175   * A custom token identifier for our custom auth'n method. Unique from the TokenIdentifier
176   * used for delegation tokens.
177   */
178  public static class PasswordAuthTokenIdentifier extends TokenIdentifier {
179    public static final Text PASSWORD_AUTH_TOKEN = new Text("HBASE_PASSWORD_TEST_TOKEN");
180    private String username;
181
182    public PasswordAuthTokenIdentifier() {}
183
184    public PasswordAuthTokenIdentifier(String username) {
185      this.username = username;
186    }
187
188    @Override
189    public void readFields(DataInput in) throws IOException {
190      this.username = WritableUtils.readString(in);
191    }
192
193    @Override
194    public void write(DataOutput out) throws IOException {
195      WritableUtils.writeString(out, username);
196    }
197
198    @Override
199    public Text getKind() {
200      return PASSWORD_AUTH_TOKEN;
201    }
202
203    @Override
204    public UserGroupInformation getUser() {
205      if (username == null || "".equals(username)) {
206        return null;
207      }
208      return UserGroupInformation.createRemoteUser(username);
209    }
210  }
211
212  public static Token<? extends TokenIdentifier> createPasswordToken(
213      String username, String password, String clusterId) {
214    PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(username);
215    Token<? extends TokenIdentifier> token = new Token<>(id.getBytes(), Bytes.toBytes(password),
216        id.getKind(), new Text(clusterId));
217    return token;
218  }
219
220  /**
221   * Client provider that finds custom Token in the user's UGI and authenticates with the server
222   * via DIGEST-MD5 using that password.
223   */
224  public static class InMemoryClientProvider extends AbstractSaslClientAuthenticationProvider {
225    public static final String MECHANISM = "DIGEST-MD5";
226    public static final SaslAuthMethod SASL_AUTH_METHOD = new SaslAuthMethod(
227        "IN_MEMORY", (byte)42, MECHANISM, AuthenticationMethod.TOKEN);
228
229    @Override
230    public SaslClient createClient(Configuration conf, InetAddress serverAddr,
231        SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
232        Map<String, String> saslProps) throws IOException {
233      return Sasl.createSaslClient(new String[] { MECHANISM }, null, null,
234          SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryClientProviderCallbackHandler(token));
235    }
236
237    public Optional<Token<? extends TokenIdentifier>> findToken(User user) {
238      List<Token<? extends TokenIdentifier>> tokens = user.getTokens().stream()
239        .filter((token) -> token.getKind().equals(PasswordAuthTokenIdentifier.PASSWORD_AUTH_TOKEN))
240        .collect(Collectors.toList());
241      if (tokens.isEmpty()) {
242        return Optional.empty();
243      }
244      if (tokens.size() > 1) {
245        throw new IllegalStateException("Cannot handle more than one PasswordAuthToken");
246      }
247      return Optional.of(tokens.get(0));
248    }
249
250    @Override
251    public SaslAuthMethod getSaslAuthMethod() {
252      return SASL_AUTH_METHOD;
253    }
254
255    /**
256     * Sasl CallbackHandler which extracts information from our custom token and places
257     * it into the Sasl objects.
258     */
259    public class InMemoryClientProviderCallbackHandler implements CallbackHandler {
260      private final Token<? extends TokenIdentifier> token;
261      public InMemoryClientProviderCallbackHandler(Token<? extends TokenIdentifier> token) {
262        this.token = token;
263      }
264
265      @Override
266      public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
267        NameCallback nc = null;
268        PasswordCallback pc = null;
269        RealmCallback rc = null;
270        for (Callback callback : callbacks) {
271          if (callback instanceof RealmChoiceCallback) {
272            continue;
273          } else if (callback instanceof NameCallback) {
274            nc = (NameCallback) callback;
275          } else if (callback instanceof PasswordCallback) {
276            pc = (PasswordCallback) callback;
277          } else if (callback instanceof RealmCallback) {
278            rc = (RealmCallback) callback;
279          } else {
280            throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
281          }
282        }
283        if (nc != null) {
284          nc.setName(SaslUtil.encodeIdentifier(token.getIdentifier()));
285        }
286        if (pc != null) {
287          pc.setPassword(SaslUtil.encodePassword(token.getPassword()));
288        }
289        if (rc != null) {
290          rc.setText(rc.getDefaultText());
291        }
292      }
293    }
294
295    @Override
296    public UserInformation getUserInfo(User user) {
297      return null;
298    }
299  }
300
301  /**
302   * Server provider which validates credentials from an in-memory database.
303   */
304  public static class InMemoryServerProvider extends InMemoryClientProvider
305      implements SaslServerAuthenticationProvider {
306
307    @Override
308    public AttemptingUserProvidingSaslServer createServer(
309        SecretManager<TokenIdentifier> secretManager,
310        Map<String, String> saslProps) throws IOException {
311      return new AttemptingUserProvidingSaslServer(
312          Sasl.createSaslServer(getSaslAuthMethod().getSaslMechanism(), null,
313              SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryServerProviderCallbackHandler()),
314        () -> null);
315    }
316
317    /**
318     * Pulls the correct password for the user who started the SASL handshake so that SASL
319     * can validate that the user provided the right password.
320     */
321    private class InMemoryServerProviderCallbackHandler implements CallbackHandler {
322
323      @Override
324      public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException {
325        NameCallback nc = null;
326        PasswordCallback pc = null;
327        AuthorizeCallback ac = null;
328        for (Callback callback : callbacks) {
329          if (callback instanceof AuthorizeCallback) {
330            ac = (AuthorizeCallback) callback;
331          } else if (callback instanceof NameCallback) {
332            nc = (NameCallback) callback;
333          } else if (callback instanceof PasswordCallback) {
334            pc = (PasswordCallback) callback;
335          } else if (callback instanceof RealmCallback) {
336            continue; // realm is ignored
337          } else {
338            throw new UnsupportedCallbackException(callback, "Unrecognized SASL Callback");
339          }
340        }
341        if (nc != null && pc != null) {
342          byte[] encodedId = SaslUtil.decodeIdentifier(nc.getDefaultName());
343          PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier();
344          try {
345            id.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
346          } catch (IOException e) {
347            throw (InvalidToken) new InvalidToken(
348                "Can't de-serialize tokenIdentifier").initCause(e);
349          }
350          char[] actualPassword = SaslUtil.encodePassword(
351              Bytes.toBytes(getPassword(id.getUser().getUserName())));
352          pc.setPassword(actualPassword);
353        }
354        if (ac != null) {
355          String authid = ac.getAuthenticationID();
356          String authzid = ac.getAuthorizationID();
357          if (authid.equals(authzid)) {
358            ac.setAuthorized(true);
359          } else {
360            ac.setAuthorized(false);
361          }
362          if (ac.isAuthorized()) {
363            ac.setAuthorizedID(authzid);
364          }
365        }
366      }
367    }
368
369    @Override
370    public boolean supportsProtocolAuthentication() {
371      return false;
372    }
373
374    @Override
375    public UserGroupInformation getAuthorizedUgi(String authzId,
376        SecretManager<TokenIdentifier> secretManager) throws IOException {
377      UserGroupInformation authorizedUgi;
378      byte[] encodedId = SaslUtil.decodeIdentifier(authzId);
379      PasswordAuthTokenIdentifier tokenId = new PasswordAuthTokenIdentifier();
380      try {
381        tokenId.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
382      } catch (IOException e) {
383        throw new IOException("Can't de-serialize PasswordAuthTokenIdentifier", e);
384      }
385      authorizedUgi = tokenId.getUser();
386      if (authorizedUgi == null) {
387        throw new AccessDeniedException(
388            "Can't retrieve username from tokenIdentifier.");
389      }
390      authorizedUgi.addTokenIdentifier(tokenId);
391      authorizedUgi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod());
392      return authorizedUgi;
393    }
394  }
395
396  /**
397   * Custom provider which can select our custom provider, amongst other tokens which
398   * may be available.
399   */
400  public static class InMemoryProviderSelector extends BuiltInProviderSelector {
401    private InMemoryClientProvider inMemoryProvider;
402
403    @Override
404    public void configure(Configuration conf,
405        Collection<SaslClientAuthenticationProvider> providers) {
406      super.configure(conf, providers);
407      Optional<SaslClientAuthenticationProvider> o = providers.stream()
408        .filter((p) -> p instanceof InMemoryClientProvider)
409        .findAny();
410
411      inMemoryProvider = (InMemoryClientProvider) o.orElseThrow(
412        () -> new RuntimeException("InMemoryClientProvider not found in available providers: "
413              + providers));
414    }
415
416    @Override
417    public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> selectProvider(
418        String clusterId, User user) {
419      Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> superPair =
420          super.selectProvider(clusterId, user);
421
422      Optional<Token<? extends TokenIdentifier>> optional = inMemoryProvider.findToken(user);
423      if (optional.isPresent()) {
424        LOG.info("Using InMemoryClientProvider");
425        return new Pair<>(inMemoryProvider, optional.get());
426      }
427
428      LOG.info("InMemoryClientProvider not usable, falling back to {}", superPair);
429      return superPair;
430    }
431  }
432
433  static void createBaseCluster(HBaseTestingUtility util, File keytabFile,
434      MiniKdc kdc) throws Exception {
435    String servicePrincipal = "hbase/localhost";
436    String spnegoPrincipal = "HTTP/localhost";
437    kdc.createPrincipal(keytabFile, servicePrincipal);
438    util.startMiniZKCluster();
439
440    HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(),
441        servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm());
442    HBaseKerberosUtils.setSSLConfiguration(util, SecureTestCluster.class);
443
444    util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
445        TokenProvider.class.getName());
446    util.startMiniDFSCluster(1);
447    Path rootdir = util.getDataTestDirOnTestFS("TestGenerateDelegationToken");
448    CommonFSUtils.setRootDir(util.getConfiguration(), rootdir);
449  }
450
451  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
452  private static final Configuration CONF = UTIL.getConfiguration();
453  private static LocalHBaseCluster CLUSTER;
454  private static File KEYTAB_FILE;
455
456  @BeforeClass
457  public static void setupCluster() throws Exception {
458    KEYTAB_FILE = new File(
459        UTIL.getDataTestDir("keytab").toUri().getPath());
460    final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE);
461
462    // Adds our test impls instead of creating service loader entries which
463    // might inadvertently get them loaded on a real cluster.
464    CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY,
465        InMemoryClientProvider.class.getName());
466    CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY,
467        InMemoryServerProvider.class.getName());
468    CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY,
469        InMemoryProviderSelector.class.getName());
470    CONF.setLong(HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN, 600);
471    createBaseCluster(UTIL, KEYTAB_FILE, kdc);
472  }
473
474  @Before
475  public void setUpBeforeTest() throws Exception {
476    CONF.unset(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY);
477    CONF.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
478    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
479    if (rpcClientImpl.equals(BlockingRpcClient.class.getName())) {
480      // Set the connection registry to ZKConnectionRegistry since hedging is not supported on
481      // blocking rpc clients.
482      CONF.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
483          HConstants.ZK_CONNECTION_REGISTRY_CLASS);
484    }
485    CLUSTER = new LocalHBaseCluster(CONF, 1);
486    CLUSTER.startup();
487    createTable();
488  }
489
490  @AfterClass
491  public static void teardownCluster() throws Exception {
492    if (CLUSTER != null) {
493      CLUSTER.shutdown();
494      CLUSTER = null;
495    }
496    UTIL.shutdownMiniZKCluster();
497  }
498
499  @After
500  public void shutDownCluster() throws IOException {
501    if (CLUSTER != null) {
502      UTIL.deleteTable(name.getTableName());
503      CLUSTER.shutdown();
504    }
505  }
506
507  @Rule
508  public TableNameTestRule name = new TableNameTestRule();
509  TableName tableName;
510  String clusterId;
511
512  public void createTable() throws Exception {
513    tableName = name.getTableName();
514
515    // Create a table and write a record as the service user (hbase)
516    UserGroupInformation serviceUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
517        "hbase/localhost", KEYTAB_FILE.getAbsolutePath());
518    clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() {
519      @Override public String run() throws Exception {
520        try (Connection conn = ConnectionFactory.createConnection(CONF);
521            Admin admin = conn.getAdmin();) {
522          admin.createTable(TableDescriptorBuilder
523              .newBuilder(tableName)
524              .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1"))
525              .build());
526
527          UTIL.waitTableAvailable(tableName);
528
529          try (Table t = conn.getTable(tableName)) {
530            Put p = new Put(Bytes.toBytes("r1"));
531            p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
532            t.put(p);
533          }
534
535          return admin.getClusterMetrics().getClusterId();
536        }
537      }
538    });
539
540    assertNotNull(clusterId);
541  }
542
543  @Test
544  public void testPositiveAuthentication() throws Exception {
545    // Validate that we can read that record back out as the user with our custom auth'n
546    final Configuration clientConf = new Configuration(CONF);
547    UserGroupInformation user1 = UserGroupInformation.createUserForTesting(
548        "user1", new String[0]);
549    user1.addToken(createPasswordToken("user1", USER1_PASSWORD, clusterId));
550    user1.doAs(new PrivilegedExceptionAction<Void>() {
551      @Override public Void run() throws Exception {
552        try (Connection conn = ConnectionFactory.createConnection(clientConf);
553            Table t = conn.getTable(tableName)) {
554          Result r = t.get(new Get(Bytes.toBytes("r1")));
555          assertNotNull(r);
556          assertFalse("Should have read a non-empty Result", r.isEmpty());
557          final Cell cell = r.getColumnLatestCell(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
558          assertTrue("Unexpected value", CellUtil.matchingValue(cell, Bytes.toBytes("1")));
559
560          return null;
561        }
562      }
563    });
564  }
565
566  @org.junit.Ignore @Test // See HBASE-24047 and its sub-issue to reenable.
567  public void testNegativeAuthentication() throws Exception {
568    // Validate that we can read that record back out as the user with our custom auth'n
569    final Configuration clientConf = new Configuration(CONF);
570    // This test does not work with master registry in branch-2 because of a nuance in the non-async
571    // connection implementation. See the detail below.
572    clientConf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
573        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
574    clientConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
575    UserGroupInformation user1 = UserGroupInformation.createUserForTesting(
576        "user1", new String[0]);
577    user1.addToken(createPasswordToken("user1", "definitely not the password", clusterId));
578    user1.doAs(new PrivilegedExceptionAction<Void>() {
579      @Override public Void run() throws Exception {
580        // There is a slight behavioral difference here in the 3.x vs 2.x branches. 3.x branches
581        // use async client connection implementation which throws if there is an exception when
582        // fetching the clusterId(). 2.x branches that use non-async client falls back to using a
583        // DEFAULT cluster ID in such cases. 3.x behavior makes more sense, especially if the
584        // exception is of type InvalidToken (digest mis-match), however I did not want to fix it
585        // since it makes sense only when master registry is in use (which has RPCs to master).
586        // That is the reason if you see a slight difference in the test between 3.x and 2.x.
587        try (Connection conn = ConnectionFactory.createConnection(clientConf);
588            Table t = conn.getTable(tableName)) {
589          t.get(new Get(Bytes.toBytes("r1")));
590          fail("Should not successfully authenticate with HBase");
591        } catch (RetriesExhaustedException re) {
592          assertTrue(re.getMessage(), re.getMessage().contains("SaslException"));
593        } catch (Exception e) {
594          // Any other exception is unexpected.
595          fail("Unexpected exception caught, was expecting a authentication error: "
596              + Throwables.getStackTraceAsString(e));
597        }
598        return null;
599      }
600    });
601  }
602}