001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.token;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import java.io.IOException;
029import java.net.InetSocketAddress;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Collection;
033import java.util.List;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.hbase.ChoreService;
037import org.apache.hadoop.hbase.ClusterId;
038import org.apache.hadoop.hbase.CoordinatedStateManager;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.Server;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.client.AsyncClusterConnection;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.client.ConnectionFactory;
047import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
048import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
049import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
050import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
051import org.apache.hadoop.hbase.ipc.NettyRpcServer;
052import org.apache.hadoop.hbase.ipc.RpcServer;
053import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
054import org.apache.hadoop.hbase.ipc.RpcServerFactory;
055import org.apache.hadoop.hbase.ipc.RpcServerInterface;
056import org.apache.hadoop.hbase.ipc.ServerRpcController;
057import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
058import org.apache.hadoop.hbase.keymeta.KeyManagementService;
059import org.apache.hadoop.hbase.log.HBaseMarkers;
060import org.apache.hadoop.hbase.regionserver.RegionServerServices;
061import org.apache.hadoop.hbase.security.SecurityInfo;
062import org.apache.hadoop.hbase.security.User;
063import org.apache.hadoop.hbase.testclassification.MediumTests;
064import org.apache.hadoop.hbase.testclassification.SecurityTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.Sleeper;
068import org.apache.hadoop.hbase.util.Strings;
069import org.apache.hadoop.hbase.util.Threads;
070import org.apache.hadoop.hbase.util.Writables;
071import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
072import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
073import org.apache.hadoop.net.DNS;
074import org.apache.hadoop.security.authorize.PolicyProvider;
075import org.apache.hadoop.security.authorize.Service;
076import org.apache.hadoop.security.token.SecretManager;
077import org.apache.hadoop.security.token.Token;
078import org.apache.hadoop.security.token.TokenIdentifier;
079import org.junit.After;
080import org.junit.Before;
081import org.junit.ClassRule;
082import org.junit.Test;
083import org.junit.experimental.categories.Category;
084import org.junit.runner.RunWith;
085import org.junit.runners.Parameterized;
086import org.junit.runners.Parameterized.Parameter;
087import org.junit.runners.Parameterized.Parameters;
088import org.mockito.Mockito;
089import org.slf4j.Logger;
090import org.slf4j.LoggerFactory;
091
092import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
093import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
094import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
095import org.apache.hbase.thirdparty.com.google.protobuf.Message;
096import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
097import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
098
099import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos;
100
101/**
102 * Tests for authentication token creation and usage
103 */
104// This test does a fancy trick where it uses RpcServer and plugs in the Token Service for RpcServer
105// to offer up. It worked find pre-hbase-2.0.0 but post the shading project, it fails because
106// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded
107// protobufs. Since hbase-2.0.0, we added convertion from shaded to non-shaded so this test keeps
108// working.
109@RunWith(Parameterized.class)
110@Category({ SecurityTests.class, MediumTests.class })
111public class TestTokenAuthentication {
112
113  @ClassRule
114  public static final HBaseClassTestRule CLASS_RULE =
115    HBaseClassTestRule.forClass(TestTokenAuthentication.class);
116
117  static {
118    // Setting whatever system properties after recommendation from
119    // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html
120    System.setProperty("java.security.krb5.realm", "hbase");
121    System.setProperty("java.security.krb5.kdc", "blah");
122  }
123
124  /**
125   * Basic server process for RPC authentication testing
126   */
127  private static class TokenServer extends TokenProvider
128    implements AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
129    private static final Logger LOG = LoggerFactory.getLogger(TokenServer.class);
130    private Configuration conf;
131    private HBaseTestingUtil TEST_UTIL;
132    private RpcServerInterface rpcServer;
133    private InetSocketAddress isa;
134    private ZKWatcher zookeeper;
135    private Sleeper sleeper;
136    private boolean started = false;
137    private boolean aborted = false;
138    private boolean stopped = false;
139    private long startcode;
140
141    public TokenServer(Configuration conf, HBaseTestingUtil TEST_UTIL) throws IOException {
142      this.conf = conf;
143      this.TEST_UTIL = TEST_UTIL;
144      this.startcode = EnvironmentEdgeManager.currentTime();
145      // Server to handle client requests.
146      String hostname =
147        Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default"));
148      int port = 0;
149      // Creation of an ISA will force a resolve.
150      InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
151      if (initialIsa.getAddress() == null) {
152        throw new IllegalArgumentException("Failed resolve of " + initialIsa);
153      }
154      final List<BlockingServiceAndInterface> sai = new ArrayList<>(1);
155      // Make a proxy to go between the shaded Service that rpc expects and the
156      // non-shaded Service this CPEP is providing. This is because this test does a neat
157      // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
158      // worked fine before we shaded PB. Now we need these proxies.
159      final BlockingService service =
160        AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
161      final BlockingService proxy = new BlockingService() {
162        @Override
163        public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
164          Message param) throws ServiceException {
165          MethodDescriptor methodDescriptor =
166            service.getDescriptorForType().findMethodByName(md.getName());
167          Message request = service.getRequestPrototype(methodDescriptor);
168          // TODO: Convert rpcController
169          Message response = null;
170          try {
171            response = service.callBlockingMethod(methodDescriptor, null, request);
172          } catch (ServiceException e) {
173            throw new org.apache.hbase.thirdparty.com.google.protobuf.ServiceException(e);
174          }
175          return null;// Convert 'response'.
176        }
177
178        @Override
179        public ServiceDescriptor getDescriptorForType() {
180          return null;
181        }
182
183        @Override
184        public Message getRequestPrototype(MethodDescriptor arg0) {
185          // TODO Auto-generated method stub
186          return null;
187        }
188
189        @Override
190        public Message getResponsePrototype(MethodDescriptor arg0) {
191          // TODO Auto-generated method stub
192          return null;
193        }
194      };
195      sai.add(new BlockingServiceAndInterface(proxy,
196        AuthenticationProtos.AuthenticationService.BlockingInterface.class));
197      this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, initialIsa, conf,
198        new FifoRpcScheduler(conf, 1));
199      InetSocketAddress address = rpcServer.getListenerAddress();
200      if (address == null) {
201        throw new IOException("Listener channel is closed");
202      }
203      this.isa = address;
204      this.sleeper = new Sleeper(1000, this);
205    }
206
207    @Override
208    public Configuration getConfiguration() {
209      return conf;
210    }
211
212    @Override
213    public Connection getConnection() {
214      return null;
215    }
216
217    @Override
218    public ZKWatcher getZooKeeper() {
219      return zookeeper;
220    }
221
222    @Override
223    public CoordinatedStateManager getCoordinatedStateManager() {
224      return null;
225    }
226
227    @Override
228    public boolean isAborted() {
229      return aborted;
230    }
231
232    @Override
233    public ServerName getServerName() {
234      return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
235    }
236
237    @Override
238    public FileSystem getFileSystem() {
239      return null;
240    }
241
242    @Override
243    public boolean isStopping() {
244      return this.stopped;
245    }
246
247    @Override
248    public void abort(String reason, Throwable error) {
249      LOG.error(HBaseMarkers.FATAL, "Aborting on: " + reason, error);
250      this.aborted = true;
251      this.stopped = true;
252      sleeper.skipSleepCycle();
253    }
254
255    private void initialize() throws IOException {
256      // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth
257      Configuration zkConf = new Configuration(conf);
258      zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
259      this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(), this, true);
260      this.rpcServer.start();
261
262      // Mock up region coprocessor environment
263      RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class,
264        Mockito.withSettings().extraInterfaces(HasRegionServerServices.class));
265      when(mockRegionCpEnv.getConfiguration()).thenReturn(conf);
266      when(mockRegionCpEnv.getClassLoader())
267        .then((var1) -> Thread.currentThread().getContextClassLoader());
268      RegionServerServices mockRss = mock(RegionServerServices.class);
269      when(mockRss.getRpcServer()).thenReturn(rpcServer);
270      when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices())
271        .thenReturn(mockRss);
272
273      super.start(mockRegionCpEnv);
274      started = true;
275    }
276
277    @Override
278    public void run() {
279      try {
280        initialize();
281        while (!stopped) {
282          this.sleeper.sleep();
283        }
284      } catch (Exception e) {
285        abort(e.getMessage(), e);
286      }
287      this.rpcServer.stop();
288    }
289
290    public boolean isStarted() {
291      return started;
292    }
293
294    @Override
295    public void stop(String reason) {
296      LOG.info("Stopping due to: " + reason);
297      this.stopped = true;
298      sleeper.skipSleepCycle();
299    }
300
301    @Override
302    public boolean isStopped() {
303      return stopped;
304    }
305
306    public InetSocketAddress getAddress() {
307      return isa;
308    }
309
310    public SecretManager<? extends TokenIdentifier> getSecretManager() {
311      return ((RpcServer) rpcServer).getSecretManager();
312    }
313
314    @Override
315    public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
316      RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
317      throws ServiceException {
318      LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null));
319      // Ignore above passed in controller -- it is always null
320      ServerRpcController serverController = new ServerRpcController();
321      final BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> callback =
322        new BlockingRpcCallback<>();
323      getAuthenticationToken(null, request, callback);
324      try {
325        serverController.checkFailed();
326        return callback.get();
327      } catch (IOException ioe) {
328        throw new ServiceException(ioe);
329      }
330    }
331
332    @Override
333    public AuthenticationProtos.WhoAmIResponse whoAmI(RpcController controller,
334      AuthenticationProtos.WhoAmIRequest request) throws ServiceException {
335      LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null));
336      // Ignore above passed in controller -- it is always null
337      ServerRpcController serverController = new ServerRpcController();
338      BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
339        new BlockingRpcCallback<>();
340      whoAmI(null, request, callback);
341      try {
342        serverController.checkFailed();
343        return callback.get();
344      } catch (IOException ioe) {
345        throw new ServiceException(ioe);
346      }
347    }
348
349    @Override
350    public ChoreService getChoreService() {
351      return null;
352    }
353
354    @Override
355    public Connection createConnection(Configuration conf) throws IOException {
356      return null;
357    }
358
359    @Override
360    public AsyncClusterConnection getAsyncClusterConnection() {
361      return null;
362    }
363
364    @Override
365    public KeyManagementService getKeyManagementService() {
366      return null;
367    }
368  }
369
370  @Parameters(name = "{index}: rpcServerImpl={0}")
371  public static Collection<Object[]> parameters() {
372    return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() },
373      new Object[] { NettyRpcServer.class.getName() });
374  }
375
376  @Parameter(0)
377  public String rpcServerImpl;
378
379  private HBaseTestingUtil TEST_UTIL;
380  private TokenServer server;
381  private Thread serverThread;
382  private AuthenticationTokenSecretManager secretManager;
383  private ClusterId clusterId = new ClusterId();
384
385  @Before
386  public void setUp() throws Exception {
387    TEST_UTIL = new HBaseTestingUtil();
388    // Override the connection registry to avoid spinning up a mini cluster for the connection below
389    // to go through.
390    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
391      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
392    TEST_UTIL.startMiniZKCluster();
393    // register token type for protocol
394    SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),
395      new SecurityInfo("hbase.test.kerberos.principal",
396        AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN));
397    // security settings only added after startup so that ZK does not require SASL
398    Configuration conf = TEST_UTIL.getConfiguration();
399    conf.set("hadoop.security.authentication", "kerberos");
400    conf.set("hbase.security.authentication", "kerberos");
401    conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
402    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
403    server = new TokenServer(conf, TEST_UTIL);
404    serverThread = new Thread(server);
405    Threads.setDaemonThreadRunning(serverThread,
406      "TokenServer:" + server.getServerName().toString());
407    // wait for startup
408    while (!server.isStarted() && !server.isStopped()) {
409      Thread.sleep(10);
410    }
411    server.rpcServer.refreshAuthManager(conf, new PolicyProvider() {
412      @Override
413      public Service[] getServices() {
414        return new Service[] { new Service("security.client.protocol.acl",
415          AuthenticationProtos.AuthenticationService.BlockingInterface.class) };
416      }
417    });
418    ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
419    secretManager = (AuthenticationTokenSecretManager) server.getSecretManager();
420    while (secretManager.getCurrentKey() == null) {
421      Thread.sleep(1);
422    }
423  }
424
425  @After
426  public void tearDown() throws Exception {
427    server.stop("Test complete");
428    Threads.shutdown(serverThread);
429    TEST_UTIL.shutdownMiniZKCluster();
430  }
431
432  @Test
433  public void testTokenCreation() throws Exception {
434    Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser");
435
436    AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
437    Writables.getWritable(token.getIdentifier(), ident);
438    assertEquals("Token username should match", "testuser", ident.getUsername());
439    byte[] passwd = secretManager.retrievePassword(ident);
440    assertTrue("Token password and password from secret manager should match",
441      Bytes.equals(token.getPassword(), passwd));
442  }
443  // This won't work any more now RpcServer takes Shaded Service. It depends on RPCServer being able
444  // to provide a
445  // non-shaded service. TODO: FIX. Tried to make RPC generic but then it ripples; have to make
446  // Connection generic.
447  // And Call generic, etc.
448  //
449  // @Test
450  // public void testTokenAuthentication() throws Exception {
451  // UserGroupInformation testuser =
452  // UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
453  // testuser.setAuthenticationMethod(
454  // UserGroupInformation.AuthenticationMethod.TOKEN);
455  // final Configuration conf = TEST_UTIL.getConfiguration();
456  // UserGroupInformation.setConfiguration(conf);
457  // Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser");
458  // LOG.debug("Got token: " + token.toString());
459  // testuser.addToken(token);
460  // // Verify the server authenticates us as this token user
461  // testuser.doAs(new PrivilegedExceptionAction<Object>() {
462  // public Object run() throws Exception {
463  // Configuration c = server.getConfiguration();
464  // final RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
465  // ServerName sn =
466  // ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
467  // EnvironmentEdgeManager.currentTime());
468  // try {
469  // // Make a proxy to go between the shaded RpcController that rpc expects and the
470  // // non-shaded controller this CPEP is providing. This is because this test does a neat
471  // // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
472  // // worked fine before we shaded PB. Now we need these proxies.
473  // final org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel channel =
474  // rpcClient.createBlockingRpcChannel(sn, User.getCurrent(),
475  // HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
476  // AuthenticationProtos.AuthenticationService.BlockingInterface stub =
477  // AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
478  // AuthenticationProtos.WhoAmIResponse response =
479  // stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
480  // String myname = response.getUsername();
481  // assertEquals("testuser", myname);
482  // String authMethod = response.getAuthMethod();
483  // assertEquals("TOKEN", authMethod);
484  // } finally {
485  // rpcClient.close();
486  // }
487  // return null;
488  // }
489  // });
490  // }
491
492  @Test
493  public void testUseExistingToken() throws Exception {
494    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
495      new String[] { "testgroup" });
496    Token<AuthenticationTokenIdentifier> token = secretManager.generateToken(user.getName());
497    assertNotNull(token);
498    user.addToken(token);
499
500    // make sure we got a token
501    Token<AuthenticationTokenIdentifier> firstToken =
502      new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
503    assertNotNull(firstToken);
504    assertEquals(token, firstToken);
505
506    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
507    try {
508      assertFalse(TokenUtil.addTokenIfMissing(conn, user));
509      // make sure we still have the same token
510      Token<AuthenticationTokenIdentifier> secondToken =
511        new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
512      assertEquals(firstToken, secondToken);
513    } finally {
514      conn.close();
515    }
516  }
517}