001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.token;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.net.InetSocketAddress;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.stream.Stream;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.hbase.ChoreService;
036import org.apache.hadoop.hbase.ClusterId;
037import org.apache.hadoop.hbase.CoordinatedStateManager;
038import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.Server;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.client.AsyncClusterConnection;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionFactory;
046import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
047import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
048import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
049import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
050import org.apache.hadoop.hbase.ipc.NettyRpcServer;
051import org.apache.hadoop.hbase.ipc.RpcServer;
052import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
053import org.apache.hadoop.hbase.ipc.RpcServerFactory;
054import org.apache.hadoop.hbase.ipc.RpcServerInterface;
055import org.apache.hadoop.hbase.ipc.ServerRpcController;
056import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
057import org.apache.hadoop.hbase.keymeta.KeyManagementService;
058import org.apache.hadoop.hbase.log.HBaseMarkers;
059import org.apache.hadoop.hbase.regionserver.RegionServerServices;
060import org.apache.hadoop.hbase.security.SecurityInfo;
061import org.apache.hadoop.hbase.security.User;
062import org.apache.hadoop.hbase.testclassification.SecurityTests;
063import org.apache.hadoop.hbase.testclassification.SmallTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.hbase.util.Sleeper;
067import org.apache.hadoop.hbase.util.Strings;
068import org.apache.hadoop.hbase.util.Threads;
069import org.apache.hadoop.hbase.util.Writables;
070import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
071import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
072import org.apache.hadoop.net.DNS;
073import org.apache.hadoop.security.authorize.PolicyProvider;
074import org.apache.hadoop.security.authorize.Service;
075import org.apache.hadoop.security.token.SecretManager;
076import org.apache.hadoop.security.token.Token;
077import org.apache.hadoop.security.token.TokenIdentifier;
078import org.junit.jupiter.api.AfterEach;
079import org.junit.jupiter.api.BeforeEach;
080import org.junit.jupiter.api.Tag;
081import org.junit.jupiter.api.TestTemplate;
082import org.junit.jupiter.params.provider.Arguments;
083import org.mockito.Mockito;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
088import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
089import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
090import org.apache.hbase.thirdparty.com.google.protobuf.Message;
091import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
092import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
093
094import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos;
095
096/**
097 * Tests for authentication token creation and usage
098 */
099// This test does a fancy trick where it uses RpcServer and plugs in the Token Service for RpcServer
100// to offer up. It worked find pre-hbase-2.0.0 but post the shading project, it fails because
101// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded
102// protobufs. Since hbase-2.0.0, we added convertion from shaded to non-shaded so this test keeps
103// working.
104@Tag(SecurityTests.TAG)
105@Tag(SmallTests.TAG)
106@HBaseParameterizedTestTemplate(name = "{index}: rpcServerImpl={0}")
107public class TestTokenAuthentication {
108
109  static {
110    // Setting whatever system properties after recommendation from
111    // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html
112    System.setProperty("java.security.krb5.realm", "hbase");
113    System.setProperty("java.security.krb5.kdc", "blah");
114  }
115
116  /**
117   * Basic server process for RPC authentication testing
118   */
119  private static class TokenServer extends TokenProvider
120    implements AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
121    private static final Logger LOG = LoggerFactory.getLogger(TokenServer.class);
122    private Configuration conf;
123    private RpcServerInterface rpcServer;
124    private InetSocketAddress isa;
125    private ZKWatcher zookeeper;
126    private Sleeper sleeper;
127    private boolean started = false;
128    private boolean aborted = false;
129    private boolean stopped = false;
130    private long startcode;
131
132    public TokenServer(Configuration conf) throws IOException {
133      this.conf = conf;
134      this.startcode = EnvironmentEdgeManager.currentTime();
135      // Server to handle client requests.
136      String hostname =
137        Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default"));
138      int port = 0;
139      // Creation of an ISA will force a resolve.
140      InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
141      if (initialIsa.getAddress() == null) {
142        throw new IllegalArgumentException("Failed resolve of " + initialIsa);
143      }
144      final List<BlockingServiceAndInterface> sai = new ArrayList<>(1);
145      // Make a proxy to go between the shaded Service that rpc expects and the
146      // non-shaded Service this CPEP is providing. This is because this test does a neat
147      // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
148      // worked fine before we shaded PB. Now we need these proxies.
149      final BlockingService service =
150        AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
151      final BlockingService proxy = new BlockingService() {
152        @Override
153        public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
154          Message param) throws ServiceException {
155          MethodDescriptor methodDescriptor =
156            service.getDescriptorForType().findMethodByName(md.getName());
157          Message request = service.getRequestPrototype(methodDescriptor);
158          // TODO: Convert rpcController
159          Message response = null;
160          try {
161            response = service.callBlockingMethod(methodDescriptor, null, request);
162          } catch (ServiceException e) {
163            throw new org.apache.hbase.thirdparty.com.google.protobuf.ServiceException(e);
164          }
165          return null;// Convert 'response'.
166        }
167
168        @Override
169        public ServiceDescriptor getDescriptorForType() {
170          return null;
171        }
172
173        @Override
174        public Message getRequestPrototype(MethodDescriptor arg0) {
175          // TODO Auto-generated method stub
176          return null;
177        }
178
179        @Override
180        public Message getResponsePrototype(MethodDescriptor arg0) {
181          // TODO Auto-generated method stub
182          return null;
183        }
184      };
185      sai.add(new BlockingServiceAndInterface(proxy,
186        AuthenticationProtos.AuthenticationService.BlockingInterface.class));
187      this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, initialIsa, conf,
188        new FifoRpcScheduler(conf, 1));
189      InetSocketAddress address = rpcServer.getListenerAddress();
190      if (address == null) {
191        throw new IOException("Listener channel is closed");
192      }
193      this.isa = address;
194      this.sleeper = new Sleeper(1000, this);
195    }
196
197    @Override
198    public Configuration getConfiguration() {
199      return conf;
200    }
201
202    @Override
203    public Connection getConnection() {
204      return null;
205    }
206
207    @Override
208    public ZKWatcher getZooKeeper() {
209      return zookeeper;
210    }
211
212    @Override
213    public CoordinatedStateManager getCoordinatedStateManager() {
214      return null;
215    }
216
217    @Override
218    public boolean isAborted() {
219      return aborted;
220    }
221
222    @Override
223    public ServerName getServerName() {
224      return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
225    }
226
227    @Override
228    public FileSystem getFileSystem() {
229      return null;
230    }
231
232    @Override
233    public boolean isStopping() {
234      return this.stopped;
235    }
236
237    @Override
238    public void abort(String reason, Throwable error) {
239      LOG.error(HBaseMarkers.FATAL, "Aborting on: " + reason, error);
240      this.aborted = true;
241      this.stopped = true;
242      sleeper.skipSleepCycle();
243    }
244
245    private void initialize() throws IOException {
246      // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth
247      Configuration zkConf = new Configuration(conf);
248      zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
249      this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(), this, true);
250      this.rpcServer.start();
251
252      // Mock up region coprocessor environment
253      RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class,
254        Mockito.withSettings().extraInterfaces(HasRegionServerServices.class));
255      when(mockRegionCpEnv.getConfiguration()).thenReturn(conf);
256      when(mockRegionCpEnv.getClassLoader())
257        .then((var1) -> Thread.currentThread().getContextClassLoader());
258      RegionServerServices mockRss = mock(RegionServerServices.class);
259      when(mockRss.getRpcServer()).thenReturn(rpcServer);
260      when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices())
261        .thenReturn(mockRss);
262
263      super.start(mockRegionCpEnv);
264      started = true;
265    }
266
267    @Override
268    public void run() {
269      try {
270        initialize();
271        while (!stopped) {
272          this.sleeper.sleep();
273        }
274      } catch (Exception e) {
275        abort(e.getMessage(), e);
276      }
277      this.rpcServer.stop();
278    }
279
280    public boolean isStarted() {
281      return started;
282    }
283
284    @Override
285    public void stop(String reason) {
286      LOG.info("Stopping due to: " + reason);
287      this.stopped = true;
288      sleeper.skipSleepCycle();
289    }
290
291    @Override
292    public boolean isStopped() {
293      return stopped;
294    }
295
296    public SecretManager<? extends TokenIdentifier> getSecretManager() {
297      return ((RpcServer) rpcServer).getSecretManager();
298    }
299
300    @Override
301    public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
302      RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
303      throws ServiceException {
304      LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null));
305      // Ignore above passed in controller -- it is always null
306      ServerRpcController serverController = new ServerRpcController();
307      final BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> callback =
308        new BlockingRpcCallback<>();
309      getAuthenticationToken(null, request, callback);
310      try {
311        serverController.checkFailed();
312        return callback.get();
313      } catch (IOException ioe) {
314        throw new ServiceException(ioe);
315      }
316    }
317
318    @Override
319    public AuthenticationProtos.WhoAmIResponse whoAmI(RpcController controller,
320      AuthenticationProtos.WhoAmIRequest request) throws ServiceException {
321      LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null));
322      // Ignore above passed in controller -- it is always null
323      ServerRpcController serverController = new ServerRpcController();
324      BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
325        new BlockingRpcCallback<>();
326      whoAmI(null, request, callback);
327      try {
328        serverController.checkFailed();
329        return callback.get();
330      } catch (IOException ioe) {
331        throw new ServiceException(ioe);
332      }
333    }
334
335    @Override
336    public ChoreService getChoreService() {
337      return null;
338    }
339
340    @Override
341    public Connection createConnection(Configuration conf) throws IOException {
342      return null;
343    }
344
345    @Override
346    public AsyncClusterConnection getAsyncClusterConnection() {
347      return null;
348    }
349
350    @Override
351    public KeyManagementService getKeyManagementService() {
352      return null;
353    }
354  }
355
356  public static Stream<Arguments> parameters() {
357    return Stream.of(Arguments.of(SimpleRpcServer.class.getName()),
358      Arguments.of(NettyRpcServer.class.getName()));
359  }
360
361  public String rpcServerImpl;
362
363  public TestTokenAuthentication(String rpcServerImpl) {
364    this.rpcServerImpl = rpcServerImpl;
365  }
366
367  private HBaseTestingUtil TEST_UTIL;
368  private TokenServer server;
369  private Thread serverThread;
370  private AuthenticationTokenSecretManager secretManager;
371  private ClusterId clusterId = new ClusterId();
372
373  @BeforeEach
374  public void setUp() throws Exception {
375    TEST_UTIL = new HBaseTestingUtil();
376    // Override the connection registry to avoid spinning up a mini cluster for the connection below
377    // to go through.
378    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
379      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
380    TEST_UTIL.startMiniZKCluster();
381    // register token type for protocol
382    SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),
383      new SecurityInfo("hbase.test.kerberos.principal",
384        AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN));
385    // security settings only added after startup so that ZK does not require SASL
386    Configuration conf = TEST_UTIL.getConfiguration();
387    conf.set("hadoop.security.authentication", "kerberos");
388    conf.set("hbase.security.authentication", "kerberos");
389    conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
390    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
391    server = new TokenServer(conf);
392    serverThread = new Thread(server);
393    Threads.setDaemonThreadRunning(serverThread,
394      "TokenServer:" + server.getServerName().toString());
395    // wait for startup
396    while (!server.isStarted() && !server.isStopped()) {
397      Thread.sleep(10);
398    }
399    server.rpcServer.refreshAuthManager(conf, new PolicyProvider() {
400      @Override
401      public Service[] getServices() {
402        return new Service[] { new Service("security.client.protocol.acl",
403          AuthenticationProtos.AuthenticationService.BlockingInterface.class) };
404      }
405    });
406    ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
407    secretManager = (AuthenticationTokenSecretManager) server.getSecretManager();
408    while (secretManager.getCurrentKey() == null) {
409      Thread.sleep(1);
410    }
411  }
412
413  @AfterEach
414  public void tearDown() throws Exception {
415    server.stop("Test complete");
416    Threads.shutdown(serverThread);
417    TEST_UTIL.shutdownMiniZKCluster();
418  }
419
420  @TestTemplate
421  public void testTokenCreation() throws Exception {
422    Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser");
423
424    AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
425    Writables.getWritable(token.getIdentifier(), ident);
426    assertEquals("testuser", ident.getUsername(), "Token username should match");
427    byte[] passwd = secretManager.retrievePassword(ident);
428    assertTrue(Bytes.equals(token.getPassword(), passwd),
429      "Token password and password from secret manager should match");
430  }
431  // This won't work any more now RpcServer takes Shaded Service. It depends on RPCServer being able
432  // to provide a
433  // non-shaded service. TODO: FIX. Tried to make RPC generic but then it ripples; have to make
434  // Connection generic.
435  // And Call generic, etc.
436  //
437  // @Test
438  // public void testTokenAuthentication() throws Exception {
439  // UserGroupInformation testuser =
440  // UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
441  // testuser.setAuthenticationMethod(
442  // UserGroupInformation.AuthenticationMethod.TOKEN);
443  // final Configuration conf = TEST_UTIL.getConfiguration();
444  // UserGroupInformation.setConfiguration(conf);
445  // Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser");
446  // LOG.debug("Got token: " + token.toString());
447  // testuser.addToken(token);
448  // // Verify the server authenticates us as this token user
449  // testuser.doAs(new PrivilegedExceptionAction<Object>() {
450  // public Object run() throws Exception {
451  // Configuration c = server.getConfiguration();
452  // final RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
453  // ServerName sn =
454  // ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
455  // EnvironmentEdgeManager.currentTime());
456  // try {
457  // // Make a proxy to go between the shaded RpcController that rpc expects and the
458  // // non-shaded controller this CPEP is providing. This is because this test does a neat
459  // // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
460  // // worked fine before we shaded PB. Now we need these proxies.
461  // final org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel channel =
462  // rpcClient.createBlockingRpcChannel(sn, User.getCurrent(),
463  // HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
464  // AuthenticationProtos.AuthenticationService.BlockingInterface stub =
465  // AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
466  // AuthenticationProtos.WhoAmIResponse response =
467  // stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
468  // String myname = response.getUsername();
469  // assertEquals("testuser", myname);
470  // String authMethod = response.getAuthMethod();
471  // assertEquals("TOKEN", authMethod);
472  // } finally {
473  // rpcClient.close();
474  // }
475  // return null;
476  // }
477  // });
478  // }
479
480  @TestTemplate
481  public void testUseExistingToken() throws Exception {
482    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
483      new String[] { "testgroup" });
484    Token<AuthenticationTokenIdentifier> token = secretManager.generateToken(user.getName());
485    assertNotNull(token);
486    user.addToken(token);
487
488    // make sure we got a token
489    Token<AuthenticationTokenIdentifier> firstToken =
490      new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
491    assertNotNull(firstToken);
492    assertEquals(token, firstToken);
493
494    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
495    try {
496      assertFalse(TokenUtil.addTokenIfMissing(conn, user));
497      // make sure we still have the same token
498      Token<AuthenticationTokenIdentifier> secondToken =
499        new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
500      assertEquals(firstToken, secondToken);
501    } finally {
502      conn.close();
503    }
504  }
505}