001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.token;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import com.google.protobuf.BlockingService;
029import com.google.protobuf.RpcController;
030import com.google.protobuf.ServiceException;
031import java.io.IOException;
032import java.io.InterruptedIOException;
033import java.net.InetSocketAddress;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.List;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.hbase.ChoreService;
041import org.apache.hadoop.hbase.ClusterId;
042import org.apache.hadoop.hbase.CoordinatedStateManager;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.client.ClusterConnection;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
053import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
054import org.apache.hadoop.hbase.ipc.NettyRpcServer;
055import org.apache.hadoop.hbase.ipc.RpcServer;
056import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
057import org.apache.hadoop.hbase.ipc.RpcServerFactory;
058import org.apache.hadoop.hbase.ipc.RpcServerInterface;
059import org.apache.hadoop.hbase.ipc.ServerRpcController;
060import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
061import org.apache.hadoop.hbase.log.HBaseMarkers;
062import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
063import org.apache.hadoop.hbase.regionserver.RegionServerServices;
064import org.apache.hadoop.hbase.security.SecurityInfo;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hadoop.hbase.testclassification.MediumTests;
067import org.apache.hadoop.hbase.testclassification.SecurityTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
070import org.apache.hadoop.hbase.util.Sleeper;
071import org.apache.hadoop.hbase.util.Strings;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.hbase.util.Writables;
074import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
075import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
076import org.apache.hadoop.net.DNS;
077import org.apache.hadoop.security.authorize.PolicyProvider;
078import org.apache.hadoop.security.authorize.Service;
079import org.apache.hadoop.security.token.SecretManager;
080import org.apache.hadoop.security.token.Token;
081import org.apache.hadoop.security.token.TokenIdentifier;
082import org.junit.After;
083import org.junit.Before;
084import org.junit.ClassRule;
085import org.junit.Test;
086import org.junit.experimental.categories.Category;
087import org.junit.runner.RunWith;
088import org.junit.runners.Parameterized;
089import org.junit.runners.Parameterized.Parameter;
090import org.junit.runners.Parameterized.Parameters;
091import org.mockito.Mockito;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
096import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
097import org.apache.hbase.thirdparty.com.google.protobuf.Message;
098
099/**
100 * Tests for authentication token creation and usage
101 */
102// This test does a fancy trick where it uses RpcServer and plugs in the Token Service for RpcServer
103// to offer up. It worked find pre-hbase-2.0.0 but post the shading project, it fails because
104// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded
105// protobufs. Since hbase-2.0.0, we added convertion from shaded to non-shaded so this test keeps
106// working.
107@RunWith(Parameterized.class)
108@Category({ SecurityTests.class, MediumTests.class })
109public class TestTokenAuthentication {
110
111  @ClassRule
112  public static final HBaseClassTestRule CLASS_RULE =
113    HBaseClassTestRule.forClass(TestTokenAuthentication.class);
114
115  static {
116    // Setting whatever system properties after recommendation from
117    // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html
118    System.setProperty("java.security.krb5.realm", "hbase");
119    System.setProperty("java.security.krb5.kdc", "blah");
120  }
121
122  /**
123   * Basic server process for RPC authentication testing
124   */
125  private static class TokenServer extends TokenProvider
126    implements AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
127    private static final Logger LOG = LoggerFactory.getLogger(TokenServer.class);
128    private Configuration conf;
129    private HBaseTestingUtility TEST_UTIL;
130    private RpcServerInterface rpcServer;
131    private InetSocketAddress isa;
132    private ZKWatcher zookeeper;
133    private Sleeper sleeper;
134    private boolean started = false;
135    private boolean aborted = false;
136    private boolean stopped = false;
137    private long startcode;
138
139    public TokenServer(Configuration conf, HBaseTestingUtility TEST_UTIL) throws IOException {
140      this.conf = conf;
141      this.TEST_UTIL = TEST_UTIL;
142      this.startcode = EnvironmentEdgeManager.currentTime();
143      // Server to handle client requests.
144      String hostname =
145        Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default"));
146      int port = 0;
147      // Creation of an ISA will force a resolve.
148      InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
149      if (initialIsa.getAddress() == null) {
150        throw new IllegalArgumentException("Failed resolve of " + initialIsa);
151      }
152      final List<BlockingServiceAndInterface> sai = new ArrayList<>(1);
153      // Make a proxy to go between the shaded Service that rpc expects and the
154      // non-shaded Service this CPEP is providing. This is because this test does a neat
155      // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
156      // worked fine before we shaded PB. Now we need these proxies.
157      final BlockingService service =
158        AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
159      final org.apache.hbase.thirdparty.com.google.protobuf.BlockingService proxy =
160        new org.apache.hbase.thirdparty.com.google.protobuf.BlockingService() {
161          @Override
162          public Message callBlockingMethod(MethodDescriptor md,
163            org.apache.hbase.thirdparty.com.google.protobuf.RpcController controller, Message param)
164            throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
165            com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor =
166              service.getDescriptorForType().findMethodByName(md.getName());
167            com.google.protobuf.Message request = service.getRequestPrototype(methodDescriptor);
168            // TODO: Convert rpcController
169            com.google.protobuf.Message response = null;
170            try {
171              response = service.callBlockingMethod(methodDescriptor, null, request);
172            } catch (ServiceException e) {
173              throw new org.apache.hbase.thirdparty.com.google.protobuf.ServiceException(e);
174            }
175            return null;// Convert 'response'.
176          }
177
178          @Override
179          public ServiceDescriptor getDescriptorForType() {
180            return null;
181          }
182
183          @Override
184          public Message getRequestPrototype(MethodDescriptor arg0) {
185            // TODO Auto-generated method stub
186            return null;
187          }
188
189          @Override
190          public Message getResponsePrototype(MethodDescriptor arg0) {
191            // TODO Auto-generated method stub
192            return null;
193          }
194        };
195      sai.add(new BlockingServiceAndInterface(proxy,
196        AuthenticationProtos.AuthenticationService.BlockingInterface.class));
197      this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, initialIsa, conf,
198        new FifoRpcScheduler(conf, 1));
199      InetSocketAddress address = rpcServer.getListenerAddress();
200      if (address == null) {
201        throw new IOException("Listener channel is closed");
202      }
203      this.isa = address;
204      this.sleeper = new Sleeper(1000, this);
205    }
206
207    @Override
208    public Configuration getConfiguration() {
209      return conf;
210    }
211
212    @Override
213    public ClusterConnection getConnection() {
214      return null;
215    }
216
217    @Override
218    public ZKWatcher getZooKeeper() {
219      return zookeeper;
220    }
221
222    @Override
223    public CoordinatedStateManager getCoordinatedStateManager() {
224      return null;
225    }
226
227    @Override
228    public boolean isAborted() {
229      return aborted;
230    }
231
232    @Override
233    public ServerName getServerName() {
234      return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
235    }
236
237    @Override
238    public FileSystem getFileSystem() {
239      return null;
240    }
241
242    @Override
243    public boolean isStopping() {
244      return this.stopped;
245    }
246
247    @Override
248    public void abort(String reason, Throwable error) {
249      LOG.error(HBaseMarkers.FATAL, "Aborting on: " + reason, error);
250      this.aborted = true;
251      this.stopped = true;
252      sleeper.skipSleepCycle();
253    }
254
255    private void initialize() throws IOException {
256      // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth
257      Configuration zkConf = new Configuration(conf);
258      zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
259      this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(), this, true);
260      this.rpcServer.start();
261
262      // Mock up region coprocessor environment
263      RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class,
264        Mockito.withSettings().extraInterfaces(HasRegionServerServices.class));
265      when(mockRegionCpEnv.getConfiguration()).thenReturn(conf);
266      when(mockRegionCpEnv.getClassLoader())
267        .then((var1) -> Thread.currentThread().getContextClassLoader());
268      RegionServerServices mockRss = mock(RegionServerServices.class);
269      when(mockRss.getRpcServer()).thenReturn(rpcServer);
270      when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices())
271        .thenReturn(mockRss);
272
273      super.start(mockRegionCpEnv);
274      started = true;
275    }
276
277    @Override
278    public void run() {
279      try {
280        initialize();
281        while (!stopped) {
282          this.sleeper.sleep();
283        }
284      } catch (Exception e) {
285        abort(e.getMessage(), e);
286      }
287      this.rpcServer.stop();
288    }
289
290    public boolean isStarted() {
291      return started;
292    }
293
294    @Override
295    public void stop(String reason) {
296      LOG.info("Stopping due to: " + reason);
297      this.stopped = true;
298      sleeper.skipSleepCycle();
299    }
300
301    @Override
302    public boolean isStopped() {
303      return stopped;
304    }
305
306    public InetSocketAddress getAddress() {
307      return isa;
308    }
309
310    public SecretManager<? extends TokenIdentifier> getSecretManager() {
311      return ((RpcServer) rpcServer).getSecretManager();
312    }
313
314    @Override
315    public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
316      RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
317      throws ServiceException {
318      LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null));
319      // Ignore above passed in controller -- it is always null
320      ServerRpcController serverController = new ServerRpcController();
321      final NonShadedBlockingRpcCallback<
322        AuthenticationProtos.GetAuthenticationTokenResponse> callback =
323          new NonShadedBlockingRpcCallback<>();
324      getAuthenticationToken(null, request, callback);
325      try {
326        serverController.checkFailed();
327        return callback.get();
328      } catch (IOException ioe) {
329        throw new ServiceException(ioe);
330      }
331    }
332
333    @Override
334    public AuthenticationProtos.WhoAmIResponse whoAmI(RpcController controller,
335      AuthenticationProtos.WhoAmIRequest request) throws ServiceException {
336      LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null));
337      // Ignore above passed in controller -- it is always null
338      ServerRpcController serverController = new ServerRpcController();
339      NonShadedBlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
340        new NonShadedBlockingRpcCallback<>();
341      whoAmI(null, request, callback);
342      try {
343        serverController.checkFailed();
344        return callback.get();
345      } catch (IOException ioe) {
346        throw new ServiceException(ioe);
347      }
348    }
349
350    @Override
351    public ChoreService getChoreService() {
352      return null;
353    }
354
355    @Override
356    public ClusterConnection getClusterConnection() {
357      // TODO Auto-generated method stub
358      return null;
359    }
360
361    @Override
362    public Connection createConnection(Configuration conf) throws IOException {
363      return null;
364    }
365  }
366
367  @Parameters(name = "{index}: rpcServerImpl={0}")
368  public static Collection<Object[]> parameters() {
369    return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() },
370      new Object[] { NettyRpcServer.class.getName() });
371  }
372
373  @Parameter(0)
374  public String rpcServerImpl;
375
376  private HBaseTestingUtility TEST_UTIL;
377  private TokenServer server;
378  private Thread serverThread;
379  private AuthenticationTokenSecretManager secretManager;
380  private ClusterId clusterId = new ClusterId();
381
382  @Before
383  public void setUp() throws Exception {
384    TEST_UTIL = new HBaseTestingUtility();
385    // Override the connection registry to avoid spinning up a mini cluster for the connection below
386    // to go through.
387    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
388      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
389    TEST_UTIL.startMiniZKCluster();
390    // register token type for protocol
391    SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),
392      new SecurityInfo("hbase.test.kerberos.principal",
393        AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN));
394    // security settings only added after startup so that ZK does not require SASL
395    Configuration conf = TEST_UTIL.getConfiguration();
396    conf.set("hadoop.security.authentication", "kerberos");
397    conf.set("hbase.security.authentication", "kerberos");
398    conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
399    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
400    server = new TokenServer(conf, TEST_UTIL);
401    serverThread = new Thread(server);
402    Threads.setDaemonThreadRunning(serverThread,
403      "TokenServer:" + server.getServerName().toString());
404    // wait for startup
405    while (!server.isStarted() && !server.isStopped()) {
406      Thread.sleep(10);
407    }
408    server.rpcServer.refreshAuthManager(conf, new PolicyProvider() {
409      @Override
410      public Service[] getServices() {
411        return new Service[] { new Service("security.client.protocol.acl",
412          AuthenticationProtos.AuthenticationService.BlockingInterface.class) };
413      }
414    });
415    ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
416    secretManager = (AuthenticationTokenSecretManager) server.getSecretManager();
417    while (secretManager.getCurrentKey() == null) {
418      Thread.sleep(1);
419    }
420  }
421
422  @After
423  public void tearDown() throws Exception {
424    server.stop("Test complete");
425    Threads.shutdown(serverThread);
426    TEST_UTIL.shutdownMiniZKCluster();
427  }
428
429  @Test
430  public void testTokenCreation() throws Exception {
431    Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser");
432
433    AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
434    Writables.getWritable(token.getIdentifier(), ident);
435    assertEquals("Token username should match", "testuser", ident.getUsername());
436    byte[] passwd = secretManager.retrievePassword(ident);
437    assertTrue("Token password and password from secret manager should match",
438      Bytes.equals(token.getPassword(), passwd));
439  }
440  // This won't work any more now RpcServer takes Shaded Service. It depends on RPCServer being able
441  // to provide a
442  // non-shaded service. TODO: FIX. Tried to make RPC generic but then it ripples; have to make
443  // Connection generic.
444  // And Call generic, etc.
445  //
446  // @Test
447  // public void testTokenAuthentication() throws Exception {
448  // UserGroupInformation testuser =
449  // UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
450  // testuser.setAuthenticationMethod(
451  // UserGroupInformation.AuthenticationMethod.TOKEN);
452  // final Configuration conf = TEST_UTIL.getConfiguration();
453  // UserGroupInformation.setConfiguration(conf);
454  // Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser");
455  // LOG.debug("Got token: " + token.toString());
456  // testuser.addToken(token);
457  // // Verify the server authenticates us as this token user
458  // testuser.doAs(new PrivilegedExceptionAction<Object>() {
459  // public Object run() throws Exception {
460  // Configuration c = server.getConfiguration();
461  // final RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
462  // ServerName sn =
463  // ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
464  // EnvironmentEdgeManager.currentTime());
465  // try {
466  // // Make a proxy to go between the shaded RpcController that rpc expects and the
467  // // non-shaded controller this CPEP is providing. This is because this test does a neat
468  // // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
469  // // worked fine before we shaded PB. Now we need these proxies.
470  // final org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel channel =
471  // rpcClient.createBlockingRpcChannel(sn, User.getCurrent(),
472  // HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
473  // AuthenticationProtos.AuthenticationService.BlockingInterface stub =
474  // AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
475  // AuthenticationProtos.WhoAmIResponse response =
476  // stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
477  // String myname = response.getUsername();
478  // assertEquals("testuser", myname);
479  // String authMethod = response.getAuthMethod();
480  // assertEquals("TOKEN", authMethod);
481  // } finally {
482  // rpcClient.close();
483  // }
484  // return null;
485  // }
486  // });
487  // }
488
489  @Test
490  public void testUseExistingToken() throws Exception {
491    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
492      new String[] { "testgroup" });
493    Token<AuthenticationTokenIdentifier> token = secretManager.generateToken(user.getName());
494    assertNotNull(token);
495    user.addToken(token);
496
497    // make sure we got a token
498    Token<AuthenticationTokenIdentifier> firstToken =
499      new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
500    assertNotNull(firstToken);
501    assertEquals(token, firstToken);
502
503    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
504    try {
505      assertFalse(TokenUtil.addTokenIfMissing(conn, user));
506      // make sure we still have the same token
507      Token<AuthenticationTokenIdentifier> secondToken =
508        new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
509      assertEquals(firstToken, secondToken);
510    } finally {
511      conn.close();
512    }
513  }
514
515  /**
516   * A copy of the BlockingRpcCallback class for use locally. Only difference is that it makes use
517   * of non-shaded protobufs; i.e. refers to com.google.protobuf.* rather than to
518   * org.apache.hbase.thirdparty.com.google.protobuf.*
519   */
520  private static class NonShadedBlockingRpcCallback<R>
521    implements com.google.protobuf.RpcCallback<R> {
522    private R result;
523    private boolean resultSet = false;
524
525    /**
526     * Called on completion of the RPC call with the response object, or {@code null} in the case of
527     * an error.
528     * @param parameter the response object or {@code null} if an error occurred
529     */
530    @Override
531    public void run(R parameter) {
532      synchronized (this) {
533        result = parameter;
534        resultSet = true;
535        this.notifyAll();
536      }
537    }
538
539    /**
540     * Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was
541     * passed. When used asynchronously, this method will block until the {@link #run(Object)}
542     * method has been called.
543     * @return the response object or {@code null} if no response was passed
544     */
545    public synchronized R get() throws IOException {
546      while (!resultSet) {
547        try {
548          this.wait();
549        } catch (InterruptedException ie) {
550          InterruptedIOException exception = new InterruptedIOException(ie.getMessage());
551          exception.initCause(ie);
552          throw exception;
553        }
554      }
555      return result;
556    }
557  }
558}