001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.token;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertTrue;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.when;
027
028import com.google.protobuf.BlockingService;
029import com.google.protobuf.RpcController;
030import com.google.protobuf.ServiceException;
031import java.io.IOException;
032import java.io.InterruptedIOException;
033import java.net.InetSocketAddress;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.List;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.hbase.ChoreService;
041import org.apache.hadoop.hbase.ClusterId;
042import org.apache.hadoop.hbase.CoordinatedStateManager;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.client.ClusterConnection;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.ConnectionFactory;
050import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
052import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
053import org.apache.hadoop.hbase.ipc.NettyRpcServer;
054import org.apache.hadoop.hbase.ipc.RpcServer;
055import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
056import org.apache.hadoop.hbase.ipc.RpcServerFactory;
057import org.apache.hadoop.hbase.ipc.RpcServerInterface;
058import org.apache.hadoop.hbase.ipc.ServerRpcController;
059import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
060import org.apache.hadoop.hbase.log.HBaseMarkers;
061import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
062import org.apache.hadoop.hbase.regionserver.RegionServerServices;
063import org.apache.hadoop.hbase.security.SecurityInfo;
064import org.apache.hadoop.hbase.security.User;
065import org.apache.hadoop.hbase.testclassification.MediumTests;
066import org.apache.hadoop.hbase.testclassification.SecurityTests;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
069import org.apache.hadoop.hbase.util.Sleeper;
070import org.apache.hadoop.hbase.util.Strings;
071import org.apache.hadoop.hbase.util.Threads;
072import org.apache.hadoop.hbase.util.Writables;
073import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
074import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
075import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
076import org.apache.hadoop.net.DNS;
077import org.apache.hadoop.security.authorize.PolicyProvider;
078import org.apache.hadoop.security.authorize.Service;
079import org.apache.hadoop.security.token.SecretManager;
080import org.apache.hadoop.security.token.Token;
081import org.apache.hadoop.security.token.TokenIdentifier;
082import org.junit.After;
083import org.junit.Before;
084import org.junit.ClassRule;
085import org.junit.Test;
086import org.junit.experimental.categories.Category;
087import org.junit.runner.RunWith;
088import org.junit.runners.Parameterized;
089import org.junit.runners.Parameterized.Parameter;
090import org.junit.runners.Parameterized.Parameters;
091import org.mockito.Mockito;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
096import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.ServiceDescriptor;
097import org.apache.hbase.thirdparty.com.google.protobuf.Message;
098
099/**
100 * Tests for authentication token creation and usage
101 */
102// This test does a fancy trick where it uses RpcServer and plugs in the Token Service for RpcServer
103// to offer up. It worked find pre-hbase-2.0.0 but post the shading project, it fails because
104// RpcServer is all about shaded protobuf whereas the Token Service is a CPEP which does non-shaded
105// protobufs. Since hbase-2.0.0, we added convertion from shaded to  non-shaded so this test keeps
106// working.
107@RunWith(Parameterized.class)
108@Category({SecurityTests.class, MediumTests.class})
109public class TestTokenAuthentication {
110
111  @ClassRule
112  public static final HBaseClassTestRule CLASS_RULE =
113      HBaseClassTestRule.forClass(TestTokenAuthentication.class);
114
115  static {
116    // Setting whatever system properties after recommendation from
117    // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html
118    System.setProperty("java.security.krb5.realm", "hbase");
119    System.setProperty("java.security.krb5.kdc", "blah");
120  }
121
122  /**
123   * Basic server process for RPC authentication testing
124   */
125  private static class TokenServer extends TokenProvider implements
126      AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
127    private static final Logger LOG = LoggerFactory.getLogger(TokenServer.class);
128    private Configuration conf;
129    private HBaseTestingUtility TEST_UTIL;
130    private RpcServerInterface rpcServer;
131    private InetSocketAddress isa;
132    private ZKWatcher zookeeper;
133    private Sleeper sleeper;
134    private boolean started = false;
135    private boolean aborted = false;
136    private boolean stopped = false;
137    private long startcode;
138
139    public TokenServer(Configuration conf, HBaseTestingUtility TEST_UTIL) throws IOException {
140      this.conf = conf;
141      this.TEST_UTIL = TEST_UTIL;
142      this.startcode = EnvironmentEdgeManager.currentTime();
143      // Server to handle client requests.
144      String hostname =
145        Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default"));
146      int port = 0;
147      // Creation of an ISA will force a resolve.
148      InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
149      if (initialIsa.getAddress() == null) {
150        throw new IllegalArgumentException("Failed resolve of " + initialIsa);
151      }
152      final List<BlockingServiceAndInterface> sai = new ArrayList<>(1);
153      // Make a proxy to go between the shaded Service that rpc expects and the
154      // non-shaded Service this CPEP is providing. This is because this test does a neat
155      // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
156      // worked fine before we shaded PB. Now we need these proxies.
157      final BlockingService service =
158        AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
159      final org.apache.hbase.thirdparty.com.google.protobuf.BlockingService proxy =
160        new org.apache.hbase.thirdparty.com.google.protobuf.BlockingService() {
161          @Override
162          public Message callBlockingMethod(MethodDescriptor md,
163              org.apache.hbase.thirdparty.com.google.protobuf.RpcController controller,
164              Message param)
165              throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
166            com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor =
167              service.getDescriptorForType().findMethodByName(md.getName());
168            com.google.protobuf.Message request = service.getRequestPrototype(methodDescriptor);
169            // TODO: Convert rpcController
170            com.google.protobuf.Message response = null;
171            try {
172              response = service.callBlockingMethod(methodDescriptor, null, request);
173            } catch (ServiceException e) {
174              throw new org.apache.hbase.thirdparty.com.google.protobuf.ServiceException(e);
175            }
176            return null;// Convert 'response'.
177          }
178
179          @Override
180          public ServiceDescriptor getDescriptorForType() {
181            return null;
182          }
183
184          @Override
185          public Message getRequestPrototype(MethodDescriptor arg0) {
186            // TODO Auto-generated method stub
187            return null;
188          }
189
190          @Override
191          public Message getResponsePrototype(MethodDescriptor arg0) {
192            // TODO Auto-generated method stub
193            return null;
194          }
195        };
196      sai.add(new BlockingServiceAndInterface(proxy,
197        AuthenticationProtos.AuthenticationService.BlockingInterface.class));
198      this.rpcServer = RpcServerFactory.createRpcServer(this, "tokenServer", sai, initialIsa, conf,
199          new FifoRpcScheduler(conf, 1));
200      InetSocketAddress address = rpcServer.getListenerAddress();
201      if (address == null) {
202        throw new IOException("Listener channel is closed");
203      }
204      this.isa = address;
205      this.sleeper = new Sleeper(1000, this);
206    }
207
208    @Override
209    public Configuration getConfiguration() {
210      return conf;
211    }
212
213    @Override
214    public ClusterConnection getConnection() {
215      return null;
216    }
217
218    @Override
219    public MetaTableLocator getMetaTableLocator() {
220      return null;
221    }
222
223    @Override
224    public ZKWatcher getZooKeeper() {
225      return zookeeper;
226    }
227
228    @Override
229    public CoordinatedStateManager getCoordinatedStateManager() {
230      return null;
231    }
232
233    @Override
234    public boolean isAborted() {
235      return aborted;
236    }
237
238    @Override
239    public ServerName getServerName() {
240      return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
241    }
242
243    @Override
244    public FileSystem getFileSystem() {
245      return null;
246    }
247
248    @Override
249    public boolean isStopping() {
250      return this.stopped;
251    }
252
253    @Override
254    public void abort(String reason, Throwable error) {
255      LOG.error(HBaseMarkers.FATAL, "Aborting on: "+reason, error);
256      this.aborted = true;
257      this.stopped = true;
258      sleeper.skipSleepCycle();
259    }
260
261    private void initialize() throws IOException {
262      // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth
263      Configuration zkConf = new Configuration(conf);
264      zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
265      this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(),
266          this, true);
267      this.rpcServer.start();
268
269      // Mock up region coprocessor environment
270      RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class,
271          Mockito.withSettings().extraInterfaces(HasRegionServerServices.class));
272      when(mockRegionCpEnv.getConfiguration()).thenReturn(conf);
273      when(mockRegionCpEnv.getClassLoader()).then(
274          (var1) -> Thread.currentThread().getContextClassLoader());
275      RegionServerServices mockRss = mock(RegionServerServices.class);
276      when(mockRss.getRpcServer()).thenReturn(rpcServer);
277      when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices())
278          .thenReturn(mockRss);
279
280      super.start(mockRegionCpEnv);
281      started = true;
282    }
283
284    @Override
285    public void run() {
286      try {
287        initialize();
288        while (!stopped) {
289          this.sleeper.sleep();
290        }
291      } catch (Exception e) {
292        abort(e.getMessage(), e);
293      }
294      this.rpcServer.stop();
295    }
296
297    public boolean isStarted() {
298      return started;
299    }
300
301    @Override
302    public void stop(String reason) {
303      LOG.info("Stopping due to: "+reason);
304      this.stopped = true;
305      sleeper.skipSleepCycle();
306    }
307
308    @Override
309    public boolean isStopped() {
310      return stopped;
311    }
312
313    public InetSocketAddress getAddress() {
314      return isa;
315    }
316
317    public SecretManager<? extends TokenIdentifier> getSecretManager() {
318      return ((RpcServer)rpcServer).getSecretManager();
319    }
320
321    @Override
322    public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
323        RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
324      throws ServiceException {
325      LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null));
326      // Ignore above passed in controller -- it is always null
327      ServerRpcController serverController = new ServerRpcController();
328      final NonShadedBlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse>
329        callback = new NonShadedBlockingRpcCallback<>();
330      getAuthenticationToken(null, request, callback);
331      try {
332        serverController.checkFailed();
333        return callback.get();
334      } catch (IOException ioe) {
335        throw new ServiceException(ioe);
336      }
337    }
338
339    @Override
340    public AuthenticationProtos.WhoAmIResponse whoAmI(
341        RpcController controller, AuthenticationProtos.WhoAmIRequest request)
342      throws ServiceException {
343      LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null));
344      // Ignore above passed in controller -- it is always null
345      ServerRpcController serverController = new ServerRpcController();
346      NonShadedBlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
347          new NonShadedBlockingRpcCallback<>();
348      whoAmI(null, request, callback);
349      try {
350        serverController.checkFailed();
351        return callback.get();
352      } catch (IOException ioe) {
353        throw new ServiceException(ioe);
354      }
355    }
356
357    @Override
358    public ChoreService getChoreService() {
359      return null;
360    }
361
362    @Override
363    public ClusterConnection getClusterConnection() {
364      // TODO Auto-generated method stub
365      return null;
366    }
367
368    @Override
369    public Connection createConnection(Configuration conf) throws IOException {
370      return null;
371    }
372  }
373
374  @Parameters(name = "{index}: rpcServerImpl={0}")
375  public static Collection<Object[]> parameters() {
376    return Arrays.asList(new Object[] { SimpleRpcServer.class.getName() },
377        new Object[] { NettyRpcServer.class.getName() });
378  }
379
380  @Parameter(0)
381  public String rpcServerImpl;
382
383  private HBaseTestingUtility TEST_UTIL;
384  private TokenServer server;
385  private Thread serverThread;
386  private AuthenticationTokenSecretManager secretManager;
387  private ClusterId clusterId = new ClusterId();
388
389  @Before
390  public void setUp() throws Exception {
391    TEST_UTIL = new HBaseTestingUtility();
392    TEST_UTIL.startMiniZKCluster();
393    // register token type for protocol
394    SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),
395      new SecurityInfo("hbase.test.kerberos.principal",
396        AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN));
397    // security settings only added after startup so that ZK does not require SASL
398    Configuration conf = TEST_UTIL.getConfiguration();
399    conf.set("hadoop.security.authentication", "kerberos");
400    conf.set("hbase.security.authentication", "kerberos");
401    conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
402    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
403    server = new TokenServer(conf, TEST_UTIL);
404    serverThread = new Thread(server);
405    Threads.setDaemonThreadRunning(serverThread, "TokenServer:"+server.getServerName().toString());
406    // wait for startup
407    while (!server.isStarted() && !server.isStopped()) {
408      Thread.sleep(10);
409    }
410    server.rpcServer.refreshAuthManager(new PolicyProvider() {
411      @Override
412      public Service[] getServices() {
413        return new Service [] {
414          new Service("security.client.protocol.acl",
415            AuthenticationProtos.AuthenticationService.BlockingInterface.class)};
416      }
417    });
418    ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
419    secretManager = (AuthenticationTokenSecretManager)server.getSecretManager();
420    while(secretManager.getCurrentKey() == null) {
421      Thread.sleep(1);
422    }
423  }
424
425  @After
426  public void tearDown() throws Exception {
427    server.stop("Test complete");
428    Threads.shutdown(serverThread);
429    TEST_UTIL.shutdownMiniZKCluster();
430  }
431
432  @Test
433  public void testTokenCreation() throws Exception {
434    Token<AuthenticationTokenIdentifier> token =
435        secretManager.generateToken("testuser");
436
437    AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
438    Writables.getWritable(token.getIdentifier(), ident);
439    assertEquals("Token username should match", "testuser",
440        ident.getUsername());
441    byte[] passwd = secretManager.retrievePassword(ident);
442    assertTrue("Token password and password from secret manager should match",
443        Bytes.equals(token.getPassword(), passwd));
444  }
445// This won't work any more now RpcServer takes Shaded Service. It depends on RPCServer being able to provide a
446// non-shaded service. TODO: FIX. Tried to make RPC generic but then it ripples; have to make Connection generic.
447// And Call generic, etc.
448//
449//  @Test
450//  public void testTokenAuthentication() throws Exception {
451//    UserGroupInformation testuser =
452//        UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
453//    testuser.setAuthenticationMethod(
454//        UserGroupInformation.AuthenticationMethod.TOKEN);
455//    final Configuration conf = TEST_UTIL.getConfiguration();
456//    UserGroupInformation.setConfiguration(conf);
457//    Token<AuthenticationTokenIdentifier> token = secretManager.generateToken("testuser");
458//    LOG.debug("Got token: " + token.toString());
459//    testuser.addToken(token);
460//    // Verify the server authenticates us as this token user
461//    testuser.doAs(new PrivilegedExceptionAction<Object>() {
462//      public Object run() throws Exception {
463//        Configuration c = server.getConfiguration();
464//        final RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
465//        ServerName sn =
466//            ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
467//                System.currentTimeMillis());
468//        try {
469//          // Make a proxy to go between the shaded RpcController that rpc expects and the
470//          // non-shaded controller this CPEP is providing. This is because this test does a neat
471//          // little trick of testing the CPEP Service by inserting it as RpcServer Service. This
472//          // worked fine before we shaded PB. Now we need these proxies.
473//          final org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel channel =
474//              rpcClient.createBlockingRpcChannel(sn, User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
475//          AuthenticationProtos.AuthenticationService.BlockingInterface stub =
476//              AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
477//          AuthenticationProtos.WhoAmIResponse response =
478//              stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
479//          String myname = response.getUsername();
480//          assertEquals("testuser", myname);
481//          String authMethod = response.getAuthMethod();
482//          assertEquals("TOKEN", authMethod);
483//        } finally {
484//          rpcClient.close();
485//        }
486//        return null;
487//      }
488//    });
489//  }
490
491  @Test
492  public void testUseExistingToken() throws Exception {
493    User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
494        new String[]{"testgroup"});
495    Token<AuthenticationTokenIdentifier> token =
496        secretManager.generateToken(user.getName());
497    assertNotNull(token);
498    user.addToken(token);
499
500    // make sure we got a token
501    Token<AuthenticationTokenIdentifier> firstToken =
502        new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
503    assertNotNull(firstToken);
504    assertEquals(token, firstToken);
505
506    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
507    try {
508      assertFalse(TokenUtil.addTokenIfMissing(conn, user));
509      // make sure we still have the same token
510      Token<AuthenticationTokenIdentifier> secondToken =
511          new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
512      assertEquals(firstToken, secondToken);
513    } finally {
514      conn.close();
515    }
516  }
517
518  /**
519   * A copy of the BlockingRpcCallback class for use locally. Only difference is that it makes
520   * use of non-shaded protobufs; i.e. refers to com.google.protobuf.* rather than to
521   * org.apache.hbase.thirdparty.com.google.protobuf.*
522   */
523  private static class NonShadedBlockingRpcCallback<R> implements
524      com.google.protobuf.RpcCallback<R> {
525    private R result;
526    private boolean resultSet = false;
527
528    /**
529     * Called on completion of the RPC call with the response object, or {@code null} in the case of
530     * an error.
531     * @param parameter the response object or {@code null} if an error occurred
532     */
533    @Override
534    public void run(R parameter) {
535      synchronized (this) {
536        result = parameter;
537        resultSet = true;
538        this.notifyAll();
539      }
540    }
541
542    /**
543     * Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was
544     * passed.  When used asynchronously, this method will block until the {@link #run(Object)}
545     * method has been called.
546     * @return the response object or {@code null} if no response was passed
547     */
548    public synchronized R get() throws IOException {
549      while (!resultSet) {
550        try {
551          this.wait();
552        } catch (InterruptedException ie) {
553          InterruptedIOException exception = new InterruptedIOException(ie.getMessage());
554          exception.initCause(ie);
555          throw exception;
556        }
557      }
558      return result;
559    }
560  }
561}