001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.token;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.net.InetSocketAddress;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Collection;
033import java.util.List;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.hbase.ChoreService;
037import org.apache.hadoop.hbase.ClusterId;
038import org.apache.hadoop.hbase.CoordinatedStateManager;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.Server;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.client.AsyncClusterConnection;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.client.ConnectionFactory;
047import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
049import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
050import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
051import org.apache.hadoop.hbase.ipc.NettyRpcServer;
052import org.apache.hadoop.hbase.ipc.RpcServer;
053import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
054import org.apache.hadoop.hbase.ipc.RpcServerFactory;
055import org.apache.hadoop.hbase.ipc.RpcServerInterface;
056import org.apache.hadoop.hbase.ipc.ServerRpcController;
057import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
058import org.apache.hadoop.hbase.log.HBaseMarkers;
059import org.apache.hadoop.hbase.regionserver.RegionServerServices;
060import org.apache.hadoop.hbase.security.SecurityInfo;
061import org.apache.hadoop.hbase.security.User;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.testclassification.SecurityTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.hbase.util.Sleeper;
067import org.apache.hadoop.hbase.util.Strings;
068import org.apache.hadoop.hbase.util.Threads;
069import org.apache.hadoop.hbase.util.Writables;
070import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
071import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
072import org.apache.hadoop.net.DNS;
073import org.apache.hadoop.security.authorize.PolicyProvider;
074import org.apache.hadoop.security.authorize.Service;
075import org.apache.hadoop.security.token.SecretManager;
076import org.apache.hadoop.security.token.Token;
077import org.apache.hadoop.security.token.TokenIdentifier;
078import org.junit.After;
079import org.junit.Before;
080import org.junit.ClassRule;
081import org.junit.Test;
082import org.junit.experimental.categories.Category;
083import org.junit.runner.RunWith;
084import org.junit.runners.Parameterized;
085import org.junit.runners.Parameterized.Parameter;
086import org.junit.runners.Parameterized.Parameters;
087import org.mockito.Mockito;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090
091import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
092import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
093import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
094import org.apache.hbase.thirdparty.com.google.protobuf.Message;
095import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
096import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
097
098import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos;
099
100/**
101 * Tests for authentication token creation and usage
102 */
103// This test does a fancy trick where it uses RpcServer and plugs in the Token Service for RpcServer
104// to offer up. It worked find pre-hbase-2.0.0 but post the shading project, it fails because
105// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded
106// protobufs. Since hbase-2.0.0, we added convertion from shaded to non-shaded so this test keeps
107// working.
108@RunWith(Parameterized.class)
109@Category({ SecurityTests.class, MediumTests.class })
110public class TestTokenAuthentication {
111
112  @ClassRule
113  public static final HBaseClassTestRule CLASS_RULE =
114    HBaseClassTestRule.forClass(TestTokenAuthentication.class);
115
116  static {
117    // Setting whatever system properties after recommendation from
118    // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html
119    System.setProperty("java.security.krb5.realm", "hbase");
120    System.setProperty("java.security.krb5.kdc", "blah");
121  }
122
123  /**
124   * Basic server process for RPC authentication testing
125   */
126  private static class TokenServer extends TokenProvider
127    implements AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
128    private static final Logger LOG = LoggerFactory.getLogger(TokenServer.class);
129    private Configuration conf;
130    private HBaseTestingUtil TEST_UTIL;
131    private RpcServerInterface rpcServer;
132    private InetSocketAddress isa;
133    private ZKWatcher zookeeper;
134    private Sleeper sleeper;
135    private boolean started = false;
136    private boolean aborted = false;
137    private boolean stopped = false;
138    private long startcode;
139
140    public TokenServer(Configuration conf, HBaseTestingUtil TEST_UTIL) throws IOException {
141      this.conf = conf;
142      this.TEST_UTIL = TEST_UTIL;
143      this.startcode = EnvironmentEdgeManager.currentTime();
144      // Server to handle client requests.
145      String hostname =
146        Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default"));
147      int port = 0;
148      // Creation of an ISA will force a resolve.
149      InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
150      if (initialIsa.getAddress() == null) {
151        throw new IllegalArgumentException("Failed resolve of " + initialIsa);
152      }
153      final List<BlockingServiceAndInterface> sai = new ArrayList<>(1);
154      // Make a proxy to go between the shaded Service that rpc expects and the
155      // non-shaded Service this CPEP is providing. This is because this test does a neat
156      // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
157      // worked fine before we shaded PB. Now we need these proxies.
158      final BlockingService service =
159        AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
160      final BlockingService proxy = new BlockingService() {
161        @Override
162        public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
163          Message param) throws ServiceException {
164          MethodDescriptor methodDescriptor =
165            service.getDescriptorForType().findMethodByName(md.getName());
166          Message request = service.getRequestPrototype(methodDescriptor);
167          // TODO: Convert rpcController
168          Message response = null;
169          try {
170            response = service.callBlockingMethod(methodDescriptor, null, request);
171          } catch (ServiceException e) {
172            throw new org.apache.hbase.thirdparty.com.google.protobuf.ServiceException(e);
173          }
174          return null;// Convert 'response'.
175        }
176
177        @Override
178        public ServiceDescriptor getDescriptorForType() {
179          return null;
180        }
181
182        @Override
183        public Message getRequestPrototype(MethodDescriptor arg0) {
184          // TODO Auto-generated method stub
185          return null;
186        }
187
188        @Override
189        public Message getResponsePrototype(MethodDescriptor arg0) {
190          // TODO Auto-generated method stub
191          return null;
192        }
193      };
194      sai.add(new BlockingServiceAndInterface(proxy,
195        AuthenticationProtos.AuthenticationService.BlockingInterface.class));
196      this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, initialIsa, conf,
197        new FifoRpcScheduler(conf, 1));
198      InetSocketAddress address = rpcServer.getListenerAddress();
199      if (address == null) {
200        throw new IOException("Listener channel is closed");
201      }
202      this.isa = address;
203      this.sleeper = new Sleeper(1000, this);
204    }
205
206    @Override
207    public Configuration getConfiguration() {
208      return conf;
209    }
210
211    @Override
212    public Connection getConnection() {
213      return null;
214    }
215
216    @Override
217    public ZKWatcher getZooKeeper() {
218      return zookeeper;
219    }
220
221    @Override
222    public CoordinatedStateManager getCoordinatedStateManager() {
223      return null;
224    }
225
226    @Override
227    public boolean isAborted() {
228      return aborted;
229    }
230
231    @Override
232    public ServerName getServerName() {
233      return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
234    }
235
236    @Override
237    public FileSystem getFileSystem() {
238      return null;
239    }
240
241    @Override
242    public boolean isStopping() {
243      return this.stopped;
244    }
245
246    @Override
247    public void abort(String reason, Throwable error) {
248      LOG.error(HBaseMarkers.FATAL, "Aborting on: " + reason, error);
249      this.aborted = true;
250      this.stopped = true;
251      sleeper.skipSleepCycle();
252    }
253
254    private void initialize() throws IOException {
255      // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth
256      Configuration zkConf = new Configuration(conf);
257      zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
258      this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(), this, true);
259      this.rpcServer.start();
260
261      // Mock up region coprocessor environment
262      RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class,
263        Mockito.withSettings().extraInterfaces(HasRegionServerServices.class));
264      when(mockRegionCpEnv.getConfiguration()).thenReturn(conf);
265      when(mockRegionCpEnv.getClassLoader())
266        .then((var1) -> Thread.currentThread().getContextClassLoader());
267      RegionServerServices mockRss = mock(RegionServerServices.class);
268      when(mockRss.getRpcServer()).thenReturn(rpcServer);
269      when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices())
270        .thenReturn(mockRss);
271
272      super.start(mockRegionCpEnv);
273      started = true;
274    }
275
276    @Override
277    public void run() {
278      try {
279        initialize();
280        while (!stopped) {
281          this.sleeper.sleep();
282        }
283      } catch (Exception e) {
284        abort(e.getMessage(), e);
285      }
286      this.rpcServer.stop();
287    }
288
289    public boolean isStarted() {
290      return started;
291    }
292
293    @Override
294    public void stop(String reason) {
295      LOG.info("Stopping due to: " + reason);
296      this.stopped = true;
297      sleeper.skipSleepCycle();
298    }
299
300    @Override
301    public boolean isStopped() {
302      return stopped;
303    }
304
305    public InetSocketAddress getAddress() {
306      return isa;
307    }
308
309    public SecretManager<? extends TokenIdentifier> getSecretManager() {
310      return ((RpcServer) rpcServer).getSecretManager();
311    }
312
313    @Override
314    public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
315      RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
316      throws ServiceException {
317      LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null));
318      // Ignore above passed in controller -- it is always null
319      ServerRpcController serverController = new ServerRpcController();
320      final BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> callback =
321        new BlockingRpcCallback<>();
322      getAuthenticationToken(null, request, callback);
323      try {
324        serverController.checkFailed();
325        return callback.get();
326      } catch (IOException ioe) {
327        throw new ServiceException(ioe);
328      }
329    }
330
331    @Override
332    public AuthenticationProtos.WhoAmIResponse whoAmI(RpcController controller,
333      AuthenticationProtos.WhoAmIRequest request) throws ServiceException {
334      LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null));
335      // Ignore above passed in controller -- it is always null
336      ServerRpcController serverController = new ServerRpcController();
337      BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
338        new BlockingRpcCallback<>();
339      whoAmI(null, request, callback);
340      try {
341        serverController.checkFailed();
342        return callback.get();
343      } catch (IOException ioe) {
344        throw new ServiceException(ioe);
345      }
346    }
347
348    @Override
349    public ChoreService getChoreService() {
350      return null;
351    }
352
353    @Override
354    public Connection createConnection(Configuration conf) throws IOException {
355      return null;
356    }
357
358    @Override
359    public AsyncClusterConnection getAsyncClusterConnection() {
360      return null;
361    }
362  }
363
364  @Parameters(name = "{index}: rpcServerImpl={0}")
365  public static Collection<Object[]> parameters() {
366    return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() },
367      new Object[] { NettyRpcServer.class.getName() });
368  }
369
370  @Parameter(0)
371  public String rpcServerImpl;
372
373  private HBaseTestingUtil TEST_UTIL;
374  private TokenServer server;
375  private Thread serverThread;
376  private AuthenticationTokenSecretManager secretManager;
377  private ClusterId clusterId = new ClusterId();
378
379  @Before
380  public void setUp() throws Exception {
381    TEST_UTIL = new HBaseTestingUtil();
382    // Override the connection registry to avoid spinning up a mini cluster for the connection below
383    // to go through.
384    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
385      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
386    TEST_UTIL.startMiniZKCluster();
387    // register token type for protocol
388    SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),
389      new SecurityInfo("hbase.test.kerberos.principal",
390        AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN));
391    // security settings only added after startup so that ZK does not require SASL
392    Configuration conf = TEST_UTIL.getConfiguration();
393    conf.set("hadoop.security.authentication", "kerberos");
394    conf.set("hbase.security.authentication", "kerberos");
395    conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
396    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
397    server = new TokenServer(conf, TEST_UTIL);
398    serverThread = new Thread(server);
399    Threads.setDaemonThreadRunning(serverThread,
400      "TokenServer:" + server.getServerName().toString());
401    // wait for startup
402    while (!server.isStarted() && !server.isStopped()) {
403      Thread.sleep(10);
404    }
405    server.rpcServer.refreshAuthManager(conf, new PolicyProvider() {
406      @Override
407      public Service[] getServices() {
408        return new Service[] { new Service("security.client.protocol.acl",
409          AuthenticationProtos.AuthenticationService.BlockingInterface.class) };
410      }
411    });
412    ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
413    secretManager = (AuthenticationTokenSecretManager) server.getSecretManager();
414    while (secretManager.getCurrentKey() == null) {
415      Thread.sleep(1);
416    }
417  }
418
419  @After
420  public void tearDown() throws Exception {
421    server.stop("Test complete");
422    Threads.shutdown(serverThread);
423    TEST_UTIL.shutdownMiniZKCluster();
424  }
425
426  @Test
427  public void testTokenCreation() throws Exception {
428    Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser");
429
430    AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
431    Writables.getWritable(token.getIdentifier(), ident);
432    assertEquals("Token username should match", "testuser", ident.getUsername());
433    byte[] passwd = secretManager.retrievePassword(ident);
434    assertTrue("Token password and password from secret manager should match",
435      Bytes.equals(token.getPassword(), passwd));
436  }
437  // This won't work any more now RpcServer takes Shaded Service. It depends on RPCServer being able
438  // to provide a
439  // non-shaded service. TODO: FIX. Tried to make RPC generic but then it ripples; have to make
440  // Connection generic.
441  // And Call generic, etc.
442  //
443  // @Test
444  // public void testTokenAuthentication() throws Exception {
445  // UserGroupInformation testuser =
446  // UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
447  // testuser.setAuthenticationMethod(
448  // UserGroupInformation.AuthenticationMethod.TOKEN);
449  // final Configuration conf = TEST_UTIL.getConfiguration();
450  // UserGroupInformation.setConfiguration(conf);
451  // Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser");
452  // LOG.debug("Got token: " + token.toString());
453  // testuser.addToken(token);
454  // // Verify the server authenticates us as this token user
455  // testuser.doAs(new PrivilegedExceptionAction<Object>() {
456  // public Object run() throws Exception {
457  // Configuration c = server.getConfiguration();
458  // final RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
459  // ServerName sn =
460  // ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
461  // EnvironmentEdgeManager.currentTime());
462  // try {
463  // // Make a proxy to go between the shaded RpcController that rpc expects and the
464  // // non-shaded controller this CPEP is providing. This is because this test does a neat
465  // // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
466  // // worked fine before we shaded PB. Now we need these proxies.
467  // final org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel channel =
468  // rpcClient.createBlockingRpcChannel(sn, User.getCurrent(),
469  // HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
470  // AuthenticationProtos.AuthenticationService.BlockingInterface stub =
471  // AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
472  // AuthenticationProtos.WhoAmIResponse response =
473  // stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
474  // String myname = response.getUsername();
475  // assertEquals("testuser", myname);
476  // String authMethod = response.getAuthMethod();
477  // assertEquals("TOKEN", authMethod);
478  // } finally {
479  // rpcClient.close();
480  // }
481  // return null;
482  // }
483  // });
484  // }
485
486  @Test
487  public void testUseExistingToken() throws Exception {
488    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
489      new String[] { "testgroup" });
490    Token<AuthenticationTokenIdentifier> token = secretManager.generateToken(user.getName());
491    assertNotNull(token);
492    user.addToken(token);
493
494    // make sure we got a token
495    Token<AuthenticationTokenIdentifier> firstToken =
496      new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
497    assertNotNull(firstToken);
498    assertEquals(token, firstToken);
499
500    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
501    try {
502      assertFalse(TokenUtil.addTokenIfMissing(conn, user));
503      // make sure we still have the same token
504      Token<AuthenticationTokenIdentifier> secondToken =
505        new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
506      assertEquals(firstToken, secondToken);
507    } finally {
508      conn.close();
509    }
510  }
511}