001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.token;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import com.google.protobuf.BlockingService;
029import com.google.protobuf.RpcController;
030import com.google.protobuf.ServiceException;
031import java.io.IOException;
032import java.io.InterruptedIOException;
033import java.net.InetSocketAddress;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.List;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.hbase.ChoreService;
041import org.apache.hadoop.hbase.ClusterId;
042import org.apache.hadoop.hbase.CoordinatedStateManager;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.client.ClusterConnection;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
052import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
053import org.apache.hadoop.hbase.ipc.NettyRpcServer;
054import org.apache.hadoop.hbase.ipc.RpcServer;
055import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
056import org.apache.hadoop.hbase.ipc.RpcServerFactory;
057import org.apache.hadoop.hbase.ipc.RpcServerInterface;
058import org.apache.hadoop.hbase.ipc.ServerRpcController;
059import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
060import org.apache.hadoop.hbase.log.HBaseMarkers;
061import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
062import org.apache.hadoop.hbase.regionserver.RegionServerServices;
063import org.apache.hadoop.hbase.security.SecurityInfo;
064import org.apache.hadoop.hbase.security.User;
065import org.apache.hadoop.hbase.testclassification.MediumTests;
066import org.apache.hadoop.hbase.testclassification.SecurityTests;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
069import org.apache.hadoop.hbase.util.Sleeper;
070import org.apache.hadoop.hbase.util.Strings;
071import org.apache.hadoop.hbase.util.Threads;
072import org.apache.hadoop.hbase.util.Writables;
073import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
074import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
075import org.apache.hadoop.net.DNS;
076import org.apache.hadoop.security.authorize.PolicyProvider;
077import org.apache.hadoop.security.authorize.Service;
078import org.apache.hadoop.security.token.SecretManager;
079import org.apache.hadoop.security.token.Token;
080import org.apache.hadoop.security.token.TokenIdentifier;
081import org.junit.After;
082import org.junit.Before;
083import org.junit.ClassRule;
084import org.junit.Test;
085import org.junit.experimental.categories.Category;
086import org.junit.runner.RunWith;
087import org.junit.runners.Parameterized;
088import org.junit.runners.Parameterized.Parameter;
089import org.junit.runners.Parameterized.Parameters;
090import org.mockito.Mockito;
091import org.slf4j.Logger;
092import org.slf4j.LoggerFactory;
093
094import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
095import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
096import org.apache.hbase.thirdparty.com.google.protobuf.Message;
097
098/**
099 * Tests for authentication token creation and usage
100 */
101// This test does a fancy trick where it uses RpcServer and plugs in the Token Service for RpcServer
102// to offer up. It worked find pre-hbase-2.0.0 but post the shading project, it fails because
103// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded
104// protobufs. Since hbase-2.0.0, we added convertion from shaded to  non-shaded so this test keeps
105// working.
106@RunWith(Parameterized.class)
107@Category({SecurityTests.class, MediumTests.class})
108public class TestTokenAuthentication {
109
110  @ClassRule
111  public static final HBaseClassTestRule CLASS_RULE =
112      HBaseClassTestRule.forClass(TestTokenAuthentication.class);
113
114  static {
115    // Setting whatever system properties after recommendation from
116    // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html
117    System.setProperty("java.security.krb5.realm", "hbase");
118    System.setProperty("java.security.krb5.kdc", "blah");
119  }
120
121  /**
122   * Basic server process for RPC authentication testing
123   */
124  private static class TokenServer extends TokenProvider implements
125      AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
126    private static final Logger LOG = LoggerFactory.getLogger(TokenServer.class);
127    private Configuration conf;
128    private HBaseTestingUtility TEST_UTIL;
129    private RpcServerInterface rpcServer;
130    private InetSocketAddress isa;
131    private ZKWatcher zookeeper;
132    private Sleeper sleeper;
133    private boolean started = false;
134    private boolean aborted = false;
135    private boolean stopped = false;
136    private long startcode;
137
138    public TokenServer(Configuration conf, HBaseTestingUtility TEST_UTIL) throws IOException {
139      this.conf = conf;
140      this.TEST_UTIL = TEST_UTIL;
141      this.startcode = EnvironmentEdgeManager.currentTime();
142      // Server to handle client requests.
143      String hostname =
144        Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default"));
145      int port = 0;
146      // Creation of an ISA will force a resolve.
147      InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
148      if (initialIsa.getAddress() == null) {
149        throw new IllegalArgumentException("Failed resolve of " + initialIsa);
150      }
151      final List<BlockingServiceAndInterface> sai = new ArrayList<>(1);
152      // Make a proxy to go between the shaded Service that rpc expects and the
153      // non-shaded Service this CPEP is providing. This is because this test does a neat
154      // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
155      // worked fine before we shaded PB. Now we need these proxies.
156      final BlockingService service =
157        AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
158      final org.apache.hbase.thirdparty.com.google.protobuf.BlockingService proxy =
159        new org.apache.hbase.thirdparty.com.google.protobuf.BlockingService() {
160          @Override
161          public Message callBlockingMethod(MethodDescriptor md,
162              org.apache.hbase.thirdparty.com.google.protobuf.RpcController controller,
163              Message param)
164              throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
165            com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor =
166              service.getDescriptorForType().findMethodByName(md.getName());
167            com.google.protobuf.Message request = service.getRequestPrototype(methodDescriptor);
168            // TODO: Convert rpcController
169            com.google.protobuf.Message response = null;
170            try {
171              response = service.callBlockingMethod(methodDescriptor, null, request);
172            } catch (ServiceException e) {
173              throw new org.apache.hbase.thirdparty.com.google.protobuf.ServiceException(e);
174            }
175            return null;// Convert 'response'.
176          }
177
178          @Override
179          public ServiceDescriptor getDescriptorForType() {
180            return null;
181          }
182
183          @Override
184          public Message getRequestPrototype(MethodDescriptor arg0) {
185            // TODO Auto-generated method stub
186            return null;
187          }
188
189          @Override
190          public Message getResponsePrototype(MethodDescriptor arg0) {
191            // TODO Auto-generated method stub
192            return null;
193          }
194        };
195      sai.add(new BlockingServiceAndInterface(proxy,
196        AuthenticationProtos.AuthenticationService.BlockingInterface.class));
197      this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, initialIsa, conf,
198          new FifoRpcScheduler(conf, 1));
199      InetSocketAddress address = rpcServer.getListenerAddress();
200      if (address == null) {
201        throw new IOException("Listener channel is closed");
202      }
203      this.isa = address;
204      this.sleeper = new Sleeper(1000, this);
205    }
206
207    @Override
208    public Configuration getConfiguration() {
209      return conf;
210    }
211
212    @Override
213    public ClusterConnection getConnection() {
214      return null;
215    }
216
217    @Override
218    public ZKWatcher getZooKeeper() {
219      return zookeeper;
220    }
221
222    @Override
223    public CoordinatedStateManager getCoordinatedStateManager() {
224      return null;
225    }
226
227    @Override
228    public boolean isAborted() {
229      return aborted;
230    }
231
232    @Override
233    public ServerName getServerName() {
234      return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
235    }
236
237    @Override
238    public FileSystem getFileSystem() {
239      return null;
240    }
241
242    @Override
243    public boolean isStopping() {
244      return this.stopped;
245    }
246
247    @Override
248    public void abort(String reason, Throwable error) {
249      LOG.error(HBaseMarkers.FATAL, "Aborting on: "+reason, error);
250      this.aborted = true;
251      this.stopped = true;
252      sleeper.skipSleepCycle();
253    }
254
255    private void initialize() throws IOException {
256      // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth
257      Configuration zkConf = new Configuration(conf);
258      zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
259      this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(),
260          this, true);
261      this.rpcServer.start();
262
263      // Mock up region coprocessor environment
264      RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class,
265          Mockito.withSettings().extraInterfaces(HasRegionServerServices.class));
266      when(mockRegionCpEnv.getConfiguration()).thenReturn(conf);
267      when(mockRegionCpEnv.getClassLoader()).then(
268          (var1) -> Thread.currentThread().getContextClassLoader());
269      RegionServerServices mockRss = mock(RegionServerServices.class);
270      when(mockRss.getRpcServer()).thenReturn(rpcServer);
271      when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices())
272          .thenReturn(mockRss);
273
274      super.start(mockRegionCpEnv);
275      started = true;
276    }
277
278    @Override
279    public void run() {
280      try {
281        initialize();
282        while (!stopped) {
283          this.sleeper.sleep();
284        }
285      } catch (Exception e) {
286        abort(e.getMessage(), e);
287      }
288      this.rpcServer.stop();
289    }
290
291    public boolean isStarted() {
292      return started;
293    }
294
295    @Override
296    public void stop(String reason) {
297      LOG.info("Stopping due to: "+reason);
298      this.stopped = true;
299      sleeper.skipSleepCycle();
300    }
301
302    @Override
303    public boolean isStopped() {
304      return stopped;
305    }
306
307    public InetSocketAddress getAddress() {
308      return isa;
309    }
310
311    public SecretManager<? extends TokenIdentifier> getSecretManager() {
312      return ((RpcServer)rpcServer).getSecretManager();
313    }
314
315    @Override
316    public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
317        RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
318      throws ServiceException {
319      LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null));
320      // Ignore above passed in controller -- it is always null
321      ServerRpcController serverController = new ServerRpcController();
322      final NonShadedBlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse>
323        callback = new NonShadedBlockingRpcCallback<>();
324      getAuthenticationToken(null, request, callback);
325      try {
326        serverController.checkFailed();
327        return callback.get();
328      } catch (IOException ioe) {
329        throw new ServiceException(ioe);
330      }
331    }
332
333    @Override
334    public AuthenticationProtos.WhoAmIResponse whoAmI(
335        RpcController controller, AuthenticationProtos.WhoAmIRequest request)
336      throws ServiceException {
337      LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null));
338      // Ignore above passed in controller -- it is always null
339      ServerRpcController serverController = new ServerRpcController();
340      NonShadedBlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
341          new NonShadedBlockingRpcCallback<>();
342      whoAmI(null, request, callback);
343      try {
344        serverController.checkFailed();
345        return callback.get();
346      } catch (IOException ioe) {
347        throw new ServiceException(ioe);
348      }
349    }
350
351    @Override
352    public ChoreService getChoreService() {
353      return null;
354    }
355
356    @Override
357    public ClusterConnection getClusterConnection() {
358      // TODO Auto-generated method stub
359      return null;
360    }
361
362    @Override
363    public Connection createConnection(Configuration conf) throws IOException {
364      return null;
365    }
366  }
367
368  @Parameters(name = "{index}: rpcServerImpl={0}")
369  public static Collection<Object[]> parameters() {
370    return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() },
371        new Object[] { NettyRpcServer.class.getName() });
372  }
373
374  @Parameter(0)
375  public String rpcServerImpl;
376
377  private HBaseTestingUtility TEST_UTIL;
378  private TokenServer server;
379  private Thread serverThread;
380  private AuthenticationTokenSecretManager secretManager;
381  private ClusterId clusterId = new ClusterId();
382
383  @Before
384  public void setUp() throws Exception {
385    TEST_UTIL = new HBaseTestingUtility();
386    TEST_UTIL.startMiniZKCluster();
387    // register token type for protocol
388    SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),
389      new SecurityInfo("hbase.test.kerberos.principal",
390        AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN));
391    // security settings only added after startup so that ZK does not require SASL
392    Configuration conf = TEST_UTIL.getConfiguration();
393    conf.set("hadoop.security.authentication", "kerberos");
394    conf.set("hbase.security.authentication", "kerberos");
395    conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
396    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
397    server = new TokenServer(conf, TEST_UTIL);
398    serverThread = new Thread(server);
399    Threads.setDaemonThreadRunning(serverThread, "TokenServer:"+server.getServerName().toString());
400    // wait for startup
401    while (!server.isStarted() && !server.isStopped()) {
402      Thread.sleep(10);
403    }
404    server.rpcServer.refreshAuthManager(new PolicyProvider() {
405      @Override
406      public Service[] getServices() {
407        return new Service [] {
408          new Service("security.client.protocol.acl",
409            AuthenticationProtos.AuthenticationService.BlockingInterface.class)};
410      }
411    });
412    ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
413    secretManager = (AuthenticationTokenSecretManager)server.getSecretManager();
414    while(secretManager.getCurrentKey() == null) {
415      Thread.sleep(1);
416    }
417  }
418
419  @After
420  public void tearDown() throws Exception {
421    server.stop("Test complete");
422    Threads.shutdown(serverThread);
423    TEST_UTIL.shutdownMiniZKCluster();
424  }
425
426  @Test
427  public void testTokenCreation() throws Exception {
428    Token<AuthenticationTokenIdentifier> token =
429        secretManager.generateToken("testuser");
430
431    AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
432    Writables.getWritable(token.getIdentifier(), ident);
433    assertEquals("Token username should match", "testuser",
434        ident.getUsername());
435    byte[] passwd = secretManager.retrievePassword(ident);
436    assertTrue("Token password and password from secret manager should match",
437        Bytes.equals(token.getPassword(), passwd));
438  }
439// This won't work any more now RpcServer takes Shaded Service. It depends on RPCServer being able to provide a
440// non-shaded service. TODO: FIX. Tried to make RPC generic but then it ripples; have to make Connection generic.
441// And Call generic, etc.
442//
443//  @Test
444//  public void testTokenAuthentication() throws Exception {
445//    UserGroupInformation testuser =
446//        UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
447//    testuser.setAuthenticationMethod(
448//        UserGroupInformation.AuthenticationMethod.TOKEN);
449//    final Configuration conf = TEST_UTIL.getConfiguration();
450//    UserGroupInformation.setConfiguration(conf);
451//    Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser");
452//    LOG.debug("Got token: " + token.toString());
453//    testuser.addToken(token);
454//    // Verify the server authenticates us as this token user
455//    testuser.doAs(new PrivilegedExceptionAction<Object>() {
456//      public Object run() throws Exception {
457//        Configuration c = server.getConfiguration();
458//        final RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
459//        ServerName sn =
460//            ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
461//                System.currentTimeMillis());
462//        try {
463//          // Make a proxy to go between the shaded RpcController that rpc expects and the
464//          // non-shaded controller this CPEP is providing. This is because this test does a neat
465//          // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
466//          // worked fine before we shaded PB. Now we need these proxies.
467//          final org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel channel =
468//              rpcClient.createBlockingRpcChannel(sn, User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
469//          AuthenticationProtos.AuthenticationService.BlockingInterface stub =
470//              AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
471//          AuthenticationProtos.WhoAmIResponse response =
472//              stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
473//          String myname = response.getUsername();
474//          assertEquals("testuser", myname);
475//          String authMethod = response.getAuthMethod();
476//          assertEquals("TOKEN", authMethod);
477//        } finally {
478//          rpcClient.close();
479//        }
480//        return null;
481//      }
482//    });
483//  }
484
485  @Test
486  public void testUseExistingToken() throws Exception {
487    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
488        new String[]{"testgroup"});
489    Token<AuthenticationTokenIdentifier> token =
490        secretManager.generateToken(user.getName());
491    assertNotNull(token);
492    user.addToken(token);
493
494    // make sure we got a token
495    Token<AuthenticationTokenIdentifier> firstToken =
496        new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
497    assertNotNull(firstToken);
498    assertEquals(token, firstToken);
499
500    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
501    try {
502      assertFalse(TokenUtil.addTokenIfMissing(conn, user));
503      // make sure we still have the same token
504      Token<AuthenticationTokenIdentifier> secondToken =
505          new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
506      assertEquals(firstToken, secondToken);
507    } finally {
508      conn.close();
509    }
510  }
511
512  /**
513   * A copy of the BlockingRpcCallback class for use locally. Only difference is that it makes
514   * use of non-shaded protobufs; i.e. refers to com.google.protobuf.* rather than to
515   * org.apache.hbase.thirdparty.com.google.protobuf.*
516   */
517  private static class NonShadedBlockingRpcCallback<R> implements
518      com.google.protobuf.RpcCallback<R> {
519    private R result;
520    private boolean resultSet = false;
521
522    /**
523     * Called on completion of the RPC call with the response object, or {@code null} in the case of
524     * an error.
525     * @param parameter the response object or {@code null} if an error occurred
526     */
527    @Override
528    public void run(R parameter) {
529      synchronized (this) {
530        result = parameter;
531        resultSet = true;
532        this.notifyAll();
533      }
534    }
535
536    /**
537     * Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was
538     * passed.  When used asynchronously, this method will block until the {@link #run(Object)}
539     * method has been called.
540     * @return the response object or {@code null} if no response was passed
541     */
542    public synchronized R get() throws IOException {
543      while (!resultSet) {
544        try {
545          this.wait();
546        } catch (InterruptedException ie) {
547          InterruptedIOException exception = new InterruptedIOException(ie.getMessage());
548          exception.initCause(ie);
549          throw exception;
550        }
551      }
552      return result;
553    }
554  }
555}