001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.token;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import com.google.protobuf.BlockingService;
029import com.google.protobuf.RpcController;
030import com.google.protobuf.ServiceException;
031import java.io.IOException;
032import java.io.InterruptedIOException;
033import java.net.InetSocketAddress;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.List;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.hbase.ChoreService;
041import org.apache.hadoop.hbase.ClusterId;
042import org.apache.hadoop.hbase.CoordinatedStateManager;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.Server;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.client.ClusterConnection;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
053import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
054import org.apache.hadoop.hbase.ipc.NettyRpcServer;
055import org.apache.hadoop.hbase.ipc.RpcServer;
056import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
057import org.apache.hadoop.hbase.ipc.RpcServerFactory;
058import org.apache.hadoop.hbase.ipc.RpcServerInterface;
059import org.apache.hadoop.hbase.ipc.ServerRpcController;
060import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
061import org.apache.hadoop.hbase.log.HBaseMarkers;
062import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
063import org.apache.hadoop.hbase.regionserver.RegionServerServices;
064import org.apache.hadoop.hbase.security.SecurityInfo;
065import org.apache.hadoop.hbase.security.User;
066import org.apache.hadoop.hbase.testclassification.MediumTests;
067import org.apache.hadoop.hbase.testclassification.SecurityTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
070import org.apache.hadoop.hbase.util.Sleeper;
071import org.apache.hadoop.hbase.util.Strings;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.hbase.util.Writables;
074import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
075import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
076import org.apache.hadoop.net.DNS;
077import org.apache.hadoop.security.authorize.PolicyProvider;
078import org.apache.hadoop.security.authorize.Service;
079import org.apache.hadoop.security.token.SecretManager;
080import org.apache.hadoop.security.token.Token;
081import org.apache.hadoop.security.token.TokenIdentifier;
082import org.junit.After;
083import org.junit.Before;
084import org.junit.ClassRule;
085import org.junit.Test;
086import org.junit.experimental.categories.Category;
087import org.junit.runner.RunWith;
088import org.junit.runners.Parameterized;
089import org.junit.runners.Parameterized.Parameter;
090import org.junit.runners.Parameterized.Parameters;
091import org.mockito.Mockito;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
096import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
097import org.apache.hbase.thirdparty.com.google.protobuf.Message;
098
099/**
100 * Tests for authentication token creation and usage
101 */
102// This test does a fancy trick where it uses RpcServer and plugs in the Token Service for RpcServer
103// to offer up. It worked find pre-hbase-2.0.0 but post the shading project, it fails because
104// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded
105// protobufs. Since hbase-2.0.0, we added convertion from shaded to  non-shaded so this test keeps
106// working.
107@RunWith(Parameterized.class)
108@Category({SecurityTests.class, MediumTests.class})
109public class TestTokenAuthentication {
110
111  @ClassRule
112  public static final HBaseClassTestRule CLASS_RULE =
113      HBaseClassTestRule.forClass(TestTokenAuthentication.class);
114
115  static {
116    // Setting whatever system properties after recommendation from
117    // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html
118    System.setProperty("java.security.krb5.realm", "hbase");
119    System.setProperty("java.security.krb5.kdc", "blah");
120  }
121
122  /**
123   * Basic server process for RPC authentication testing
124   */
125  private static class TokenServer extends TokenProvider implements
126      AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
127    private static final Logger LOG = LoggerFactory.getLogger(TokenServer.class);
128    private Configuration conf;
129    private HBaseTestingUtility TEST_UTIL;
130    private RpcServerInterface rpcServer;
131    private InetSocketAddress isa;
132    private ZKWatcher zookeeper;
133    private Sleeper sleeper;
134    private boolean started = false;
135    private boolean aborted = false;
136    private boolean stopped = false;
137    private long startcode;
138
139    public TokenServer(Configuration conf, HBaseTestingUtility TEST_UTIL) throws IOException {
140      this.conf = conf;
141      this.TEST_UTIL = TEST_UTIL;
142      this.startcode = EnvironmentEdgeManager.currentTime();
143      // Server to handle client requests.
144      String hostname =
145        Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default"));
146      int port = 0;
147      // Creation of an ISA will force a resolve.
148      InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
149      if (initialIsa.getAddress() == null) {
150        throw new IllegalArgumentException("Failed resolve of " + initialIsa);
151      }
152      final List<BlockingServiceAndInterface> sai = new ArrayList<>(1);
153      // Make a proxy to go between the shaded Service that rpc expects and the
154      // non-shaded Service this CPEP is providing. This is because this test does a neat
155      // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
156      // worked fine before we shaded PB. Now we need these proxies.
157      final BlockingService service =
158        AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
159      final org.apache.hbase.thirdparty.com.google.protobuf.BlockingService proxy =
160        new org.apache.hbase.thirdparty.com.google.protobuf.BlockingService() {
161          @Override
162          public Message callBlockingMethod(MethodDescriptor md,
163              org.apache.hbase.thirdparty.com.google.protobuf.RpcController controller,
164              Message param)
165              throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
166            com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor =
167              service.getDescriptorForType().findMethodByName(md.getName());
168            com.google.protobuf.Message request = service.getRequestPrototype(methodDescriptor);
169            // TODO: Convert rpcController
170            com.google.protobuf.Message response = null;
171            try {
172              response = service.callBlockingMethod(methodDescriptor, null, request);
173            } catch (ServiceException e) {
174              throw new org.apache.hbase.thirdparty.com.google.protobuf.ServiceException(e);
175            }
176            return null;// Convert 'response'.
177          }
178
179          @Override
180          public ServiceDescriptor getDescriptorForType() {
181            return null;
182          }
183
184          @Override
185          public Message getRequestPrototype(MethodDescriptor arg0) {
186            // TODO Auto-generated method stub
187            return null;
188          }
189
190          @Override
191          public Message getResponsePrototype(MethodDescriptor arg0) {
192            // TODO Auto-generated method stub
193            return null;
194          }
195        };
196      sai.add(new BlockingServiceAndInterface(proxy,
197        AuthenticationProtos.AuthenticationService.BlockingInterface.class));
198      this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, initialIsa, conf,
199          new FifoRpcScheduler(conf, 1));
200      InetSocketAddress address = rpcServer.getListenerAddress();
201      if (address == null) {
202        throw new IOException("Listener channel is closed");
203      }
204      this.isa = address;
205      this.sleeper = new Sleeper(1000, this);
206    }
207
208    @Override
209    public Configuration getConfiguration() {
210      return conf;
211    }
212
213    @Override
214    public ClusterConnection getConnection() {
215      return null;
216    }
217
218    @Override
219    public ZKWatcher getZooKeeper() {
220      return zookeeper;
221    }
222
223    @Override
224    public CoordinatedStateManager getCoordinatedStateManager() {
225      return null;
226    }
227
228    @Override
229    public boolean isAborted() {
230      return aborted;
231    }
232
233    @Override
234    public ServerName getServerName() {
235      return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
236    }
237
238    @Override
239    public FileSystem getFileSystem() {
240      return null;
241    }
242
243    @Override
244    public boolean isStopping() {
245      return this.stopped;
246    }
247
248    @Override
249    public void abort(String reason, Throwable error) {
250      LOG.error(HBaseMarkers.FATAL, "Aborting on: "+reason, error);
251      this.aborted = true;
252      this.stopped = true;
253      sleeper.skipSleepCycle();
254    }
255
256    private void initialize() throws IOException {
257      // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth
258      Configuration zkConf = new Configuration(conf);
259      zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
260      this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(),
261          this, true);
262      this.rpcServer.start();
263
264      // Mock up region coprocessor environment
265      RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class,
266          Mockito.withSettings().extraInterfaces(HasRegionServerServices.class));
267      when(mockRegionCpEnv.getConfiguration()).thenReturn(conf);
268      when(mockRegionCpEnv.getClassLoader()).then(
269          (var1) -> Thread.currentThread().getContextClassLoader());
270      RegionServerServices mockRss = mock(RegionServerServices.class);
271      when(mockRss.getRpcServer()).thenReturn(rpcServer);
272      when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices())
273          .thenReturn(mockRss);
274
275      super.start(mockRegionCpEnv);
276      started = true;
277    }
278
279    @Override
280    public void run() {
281      try {
282        initialize();
283        while (!stopped) {
284          this.sleeper.sleep();
285        }
286      } catch (Exception e) {
287        abort(e.getMessage(), e);
288      }
289      this.rpcServer.stop();
290    }
291
292    public boolean isStarted() {
293      return started;
294    }
295
296    @Override
297    public void stop(String reason) {
298      LOG.info("Stopping due to: "+reason);
299      this.stopped = true;
300      sleeper.skipSleepCycle();
301    }
302
303    @Override
304    public boolean isStopped() {
305      return stopped;
306    }
307
308    public InetSocketAddress getAddress() {
309      return isa;
310    }
311
312    public SecretManager<? extends TokenIdentifier> getSecretManager() {
313      return ((RpcServer)rpcServer).getSecretManager();
314    }
315
316    @Override
317    public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
318        RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
319      throws ServiceException {
320      LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null));
321      // Ignore above passed in controller -- it is always null
322      ServerRpcController serverController = new ServerRpcController();
323      final NonShadedBlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse>
324        callback = new NonShadedBlockingRpcCallback<>();
325      getAuthenticationToken(null, request, callback);
326      try {
327        serverController.checkFailed();
328        return callback.get();
329      } catch (IOException ioe) {
330        throw new ServiceException(ioe);
331      }
332    }
333
334    @Override
335    public AuthenticationProtos.WhoAmIResponse whoAmI(
336        RpcController controller, AuthenticationProtos.WhoAmIRequest request)
337      throws ServiceException {
338      LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null));
339      // Ignore above passed in controller -- it is always null
340      ServerRpcController serverController = new ServerRpcController();
341      NonShadedBlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
342          new NonShadedBlockingRpcCallback<>();
343      whoAmI(null, request, callback);
344      try {
345        serverController.checkFailed();
346        return callback.get();
347      } catch (IOException ioe) {
348        throw new ServiceException(ioe);
349      }
350    }
351
352    @Override
353    public ChoreService getChoreService() {
354      return null;
355    }
356
357    @Override
358    public ClusterConnection getClusterConnection() {
359      // TODO Auto-generated method stub
360      return null;
361    }
362
363    @Override
364    public Connection createConnection(Configuration conf) throws IOException {
365      return null;
366    }
367  }
368
369  @Parameters(name = "{index}: rpcServerImpl={0}")
370  public static Collection<Object[]> parameters() {
371    return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() },
372        new Object[] { NettyRpcServer.class.getName() });
373  }
374
375  @Parameter(0)
376  public String rpcServerImpl;
377
378  private HBaseTestingUtility TEST_UTIL;
379  private TokenServer server;
380  private Thread serverThread;
381  private AuthenticationTokenSecretManager secretManager;
382  private ClusterId clusterId = new ClusterId();
383
384  @Before
385  public void setUp() throws Exception {
386    TEST_UTIL = new HBaseTestingUtility();
387    // Override the connection registry to avoid spinning up a mini cluster for the connection below
388    // to go through.
389    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
390        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
391    TEST_UTIL.startMiniZKCluster();
392    // register token type for protocol
393    SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),
394      new SecurityInfo("hbase.test.kerberos.principal",
395        AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN));
396    // security settings only added after startup so that ZK does not require SASL
397    Configuration conf = TEST_UTIL.getConfiguration();
398    conf.set("hadoop.security.authentication", "kerberos");
399    conf.set("hbase.security.authentication", "kerberos");
400    conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
401    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
402    server = new TokenServer(conf, TEST_UTIL);
403    serverThread = new Thread(server);
404    Threads.setDaemonThreadRunning(serverThread, "TokenServer:"+server.getServerName().toString());
405    // wait for startup
406    while (!server.isStarted() && !server.isStopped()) {
407      Thread.sleep(10);
408    }
409    server.rpcServer.refreshAuthManager(conf, new PolicyProvider() {
410      @Override
411      public Service[] getServices() {
412        return new Service [] {
413          new Service("security.client.protocol.acl",
414            AuthenticationProtos.AuthenticationService.BlockingInterface.class)};
415      }
416    });
417    ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
418    secretManager = (AuthenticationTokenSecretManager)server.getSecretManager();
419    while(secretManager.getCurrentKey() == null) {
420      Thread.sleep(1);
421    }
422  }
423
424  @After
425  public void tearDown() throws Exception {
426    server.stop("Test complete");
427    Threads.shutdown(serverThread);
428    TEST_UTIL.shutdownMiniZKCluster();
429  }
430
431  @Test
432  public void testTokenCreation() throws Exception {
433    Token<AuthenticationTokenIdentifier> token =
434        secretManager.generateToken("testuser");
435
436    AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
437    Writables.getWritable(token.getIdentifier(), ident);
438    assertEquals("Token username should match", "testuser",
439        ident.getUsername());
440    byte[] passwd = secretManager.retrievePassword(ident);
441    assertTrue("Token password and password from secret manager should match",
442        Bytes.equals(token.getPassword(), passwd));
443  }
444// This won't work any more now RpcServer takes Shaded Service. It depends on RPCServer being able to provide a
445// non-shaded service. TODO: FIX. Tried to make RPC generic but then it ripples; have to make Connection generic.
446// And Call generic, etc.
447//
448//  @Test
449//  public void testTokenAuthentication() throws Exception {
450//    UserGroupInformation testuser =
451//        UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
452//    testuser.setAuthenticationMethod(
453//        UserGroupInformation.AuthenticationMethod.TOKEN);
454//    final Configuration conf = TEST_UTIL.getConfiguration();
455//    UserGroupInformation.setConfiguration(conf);
456//    Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser");
457//    LOG.debug("Got token: " + token.toString());
458//    testuser.addToken(token);
459//    // Verify the server authenticates us as this token user
460//    testuser.doAs(new PrivilegedExceptionAction<Object>() {
461//      public Object run() throws Exception {
462//        Configuration c = server.getConfiguration();
463//        final RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
464//        ServerName sn =
465//            ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
466//                System.currentTimeMillis());
467//        try {
468//          // Make a proxy to go between the shaded RpcController that rpc expects and the
469//          // non-shaded controller this CPEP is providing. This is because this test does a neat
470//          // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
471//          // worked fine before we shaded PB. Now we need these proxies.
472//          final org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel channel =
473//              rpcClient.createBlockingRpcChannel(sn, User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
474//          AuthenticationProtos.AuthenticationService.BlockingInterface stub =
475//              AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
476//          AuthenticationProtos.WhoAmIResponse response =
477//              stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
478//          String myname = response.getUsername();
479//          assertEquals("testuser", myname);
480//          String authMethod = response.getAuthMethod();
481//          assertEquals("TOKEN", authMethod);
482//        } finally {
483//          rpcClient.close();
484//        }
485//        return null;
486//      }
487//    });
488//  }
489
490  @Test
491  public void testUseExistingToken() throws Exception {
492    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
493        new String[]{"testgroup"});
494    Token<AuthenticationTokenIdentifier> token =
495        secretManager.generateToken(user.getName());
496    assertNotNull(token);
497    user.addToken(token);
498
499    // make sure we got a token
500    Token<AuthenticationTokenIdentifier> firstToken =
501        new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
502    assertNotNull(firstToken);
503    assertEquals(token, firstToken);
504
505    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
506    try {
507      assertFalse(TokenUtil.addTokenIfMissing(conn, user));
508      // make sure we still have the same token
509      Token<AuthenticationTokenIdentifier> secondToken =
510          new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
511      assertEquals(firstToken, secondToken);
512    } finally {
513      conn.close();
514    }
515  }
516
517  /**
518   * A copy of the BlockingRpcCallback class for use locally. Only difference is that it makes
519   * use of non-shaded protobufs; i.e. refers to com.google.protobuf.* rather than to
520   * org.apache.hbase.thirdparty.com.google.protobuf.*
521   */
522  private static class NonShadedBlockingRpcCallback<R> implements
523      com.google.protobuf.RpcCallback<R> {
524    private R result;
525    private boolean resultSet = false;
526
527    /**
528     * Called on completion of the RPC call with the response object, or {@code null} in the case of
529     * an error.
530     * @param parameter the response object or {@code null} if an error occurred
531     */
532    @Override
533    public void run(R parameter) {
534      synchronized (this) {
535        result = parameter;
536        resultSet = true;
537        this.notifyAll();
538      }
539    }
540
541    /**
542     * Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was
543     * passed.  When used asynchronously, this method will block until the {@link #run(Object)}
544     * method has been called.
545     * @return the response object or {@code null} if no response was passed
546     */
547    public synchronized R get() throws IOException {
548      while (!resultSet) {
549        try {
550          this.wait();
551        } catch (InterruptedException ie) {
552          InterruptedIOException exception = new InterruptedIOException(ie.getMessage());
553          exception.initCause(ie);
554          throw exception;
555        }
556      }
557      return result;
558    }
559  }
560}