001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
022import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
023import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
024import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
025import static org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders.SELECTOR_KEY;
026import static org.junit.Assert.assertEquals;
027import static org.junit.Assert.assertNotSame;
028import static org.junit.Assert.assertSame;
029import static org.junit.Assert.assertThrows;
030import static org.junit.Assert.fail;
031
032import java.io.File;
033import java.io.IOException;
034import java.lang.reflect.Field;
035import java.net.InetAddress;
036import java.net.InetSocketAddress;
037import java.util.ArrayList;
038import java.util.Arrays;
039import java.util.Collection;
040import java.util.Collections;
041import java.util.List;
042import java.util.Map;
043import javax.security.sasl.SaslClient;
044import javax.security.sasl.SaslException;
045import org.apache.commons.lang3.RandomStringUtils;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.CommonConfigurationKeys;
048import org.apache.hadoop.hbase.HBaseClassTestRule;
049import org.apache.hadoop.hbase.HBaseTestingUtility;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
052import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
053import org.apache.hadoop.hbase.ipc.NettyRpcClient;
054import org.apache.hadoop.hbase.ipc.NettyRpcServer;
055import org.apache.hadoop.hbase.ipc.RpcClient;
056import org.apache.hadoop.hbase.ipc.RpcClientFactory;
057import org.apache.hadoop.hbase.ipc.RpcServer;
058import org.apache.hadoop.hbase.ipc.RpcServerFactory;
059import org.apache.hadoop.hbase.ipc.RpcServerInterface;
060import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
061import org.apache.hadoop.hbase.security.provider.AuthenticationProviderSelector;
062import org.apache.hadoop.hbase.security.provider.BuiltInProviderSelector;
063import org.apache.hadoop.hbase.security.provider.SaslAuthMethod;
064import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
065import org.apache.hadoop.hbase.testclassification.LargeTests;
066import org.apache.hadoop.hbase.testclassification.SecurityTests;
067import org.apache.hadoop.hbase.util.Pair;
068import org.apache.hadoop.minikdc.MiniKdc;
069import org.apache.hadoop.security.UserGroupInformation;
070import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
071import org.apache.hadoop.security.token.Token;
072import org.apache.hadoop.security.token.TokenIdentifier;
073import org.junit.AfterClass;
074import org.junit.Before;
075import org.junit.BeforeClass;
076import org.junit.ClassRule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.junit.runner.RunWith;
080import org.junit.runners.Parameterized;
081import org.junit.runners.Parameterized.Parameter;
082import org.junit.runners.Parameterized.Parameters;
083import org.mockito.Mockito;
084
085import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
086import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
087
088import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
089import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
091
092@RunWith(Parameterized.class)
093@Category({ SecurityTests.class, LargeTests.class })
094public class TestSecureIPC {
095
096  @ClassRule
097  public static final HBaseClassTestRule CLASS_RULE =
098    HBaseClassTestRule.forClass(TestSecureIPC.class);
099
100  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
101
102  private static final File KEYTAB_FILE =
103    new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
104
105  private static MiniKdc KDC;
106  private static String HOST = "localhost";
107  private static String PRINCIPAL;
108
109  private String krbKeytab;
110  private String krbPrincipal;
111  private UserGroupInformation ugi;
112  private Configuration clientConf;
113  private Configuration serverConf;
114
115  @Parameters(name = "{index}: rpcClientImpl={0}, rpcServerImpl={1}")
116  public static Collection<Object[]> parameters() {
117    List<Object[]> params = new ArrayList<>();
118    List<String> rpcClientImpls =
119      Arrays.asList(BlockingRpcClient.class.getName(), NettyRpcClient.class.getName());
120    List<String> rpcServerImpls =
121      Arrays.asList(SimpleRpcServer.class.getName(), NettyRpcServer.class.getName());
122    for (String rpcClientImpl : rpcClientImpls) {
123      for (String rpcServerImpl : rpcServerImpls) {
124        params.add(new Object[] { rpcClientImpl, rpcServerImpl });
125      }
126    }
127    return params;
128  }
129
130  @Parameter(0)
131  public String rpcClientImpl;
132
133  @Parameter(1)
134  public String rpcServerImpl;
135
136  @BeforeClass
137  public static void setUp() throws Exception {
138    KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
139    PRINCIPAL = "hbase/" + HOST;
140    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
141    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
142    // set a smaller timeout and retry to speed up tests
143    TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT_READ, 2000);
144    TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 1);
145  }
146
147  @AfterClass
148  public static void tearDown() throws IOException {
149    if (KDC != null) {
150      KDC.stop();
151    }
152    TEST_UTIL.cleanupTestDir();
153  }
154
155  @Before
156  public void setUpTest() throws Exception {
157    krbKeytab = getKeytabFileForTesting();
158    krbPrincipal = getPrincipalForTesting();
159    ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
160    clientConf = new Configuration(TEST_UTIL.getConfiguration());
161    setSecuredConfiguration(clientConf);
162    clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
163    serverConf = new Configuration(TEST_UTIL.getConfiguration());
164    setSecuredConfiguration(serverConf);
165    serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
166  }
167
168  @Test
169  public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
170    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
171
172    // check that the login user is okay:
173    assertSame(ugi2, ugi);
174    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
175    assertEquals(krbPrincipal, ugi.getUserName());
176
177    callRpcService(User.create(ugi2));
178  }
179
180  @Test
181  public void testRpcCallWithEnabledKerberosSaslAuth_CanonicalHostname() throws Exception {
182    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
183
184    // check that the login user is okay:
185    assertSame(ugi2, ugi);
186    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
187    assertEquals(krbPrincipal, ugi.getUserName());
188
189    enableCanonicalHostnameTesting(clientConf, "localhost");
190    clientConf.setBoolean(
191      SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, false);
192    clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
193
194    callRpcService(User.create(ugi2));
195  }
196
197  @Test
198  public void testRpcCallWithEnabledKerberosSaslAuth_NoCanonicalHostname() throws Exception {
199    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
200
201    // check that the login user is okay:
202    assertSame(ugi2, ugi);
203    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
204    assertEquals(krbPrincipal, ugi.getUserName());
205
206    enableCanonicalHostnameTesting(clientConf, "127.0.0.1");
207    clientConf
208      .setBoolean(SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, true);
209    clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
210
211    callRpcService(User.create(ugi2));
212  }
213
214  private static void enableCanonicalHostnameTesting(Configuration conf, String canonicalHostname) {
215    conf.setClass(SELECTOR_KEY, CanonicalHostnameTestingAuthenticationProviderSelector.class,
216      AuthenticationProviderSelector.class);
217    conf.set(CanonicalHostnameTestingAuthenticationProviderSelector.CANONICAL_HOST_NAME_KEY,
218      canonicalHostname);
219  }
220
221  public static class CanonicalHostnameTestingAuthenticationProviderSelector
222    extends BuiltInProviderSelector {
223    private static final String CANONICAL_HOST_NAME_KEY =
224      "CanonicalHostnameTestingAuthenticationProviderSelector.canonicalHostName";
225
226    @Override
227    public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>>
228      selectProvider(String clusterId, User user) {
229      final Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair =
230        super.selectProvider(clusterId, user);
231      pair.setFirst(createCanonicalHostNameTestingProvider(pair.getFirst()));
232      return pair;
233    }
234
235    SaslClientAuthenticationProvider
236      createCanonicalHostNameTestingProvider(SaslClientAuthenticationProvider delegate) {
237      return new SaslClientAuthenticationProvider() {
238        @Override
239        public SaslClient createClient(Configuration conf, InetAddress serverAddr,
240          SecurityInfo securityInfo, Token<? extends TokenIdentifier> token,
241          boolean fallbackAllowed, Map<String, String> saslProps) throws IOException {
242          final String s = conf.get(CANONICAL_HOST_NAME_KEY);
243          if (s != null) {
244            try {
245              final Field canonicalHostName =
246                InetAddress.class.getDeclaredField("canonicalHostName");
247              canonicalHostName.setAccessible(true);
248              canonicalHostName.set(serverAddr, s);
249            } catch (NoSuchFieldException | IllegalAccessException e) {
250              throw new RuntimeException(e);
251            }
252          }
253
254          return delegate.createClient(conf, serverAddr, securityInfo, token, fallbackAllowed,
255            saslProps);
256        }
257
258        @Override
259        public UserInformation getUserInfo(User user) {
260          return delegate.getUserInfo(user);
261        }
262
263        @Override
264        public UserGroupInformation getRealUser(User ugi) {
265          return delegate.getRealUser(ugi);
266        }
267
268        @Override
269        public boolean canRetry() {
270          return delegate.canRetry();
271        }
272
273        @Override
274        public void relogin() throws IOException {
275          delegate.relogin();
276        }
277
278        @Override
279        public SaslAuthMethod getSaslAuthMethod() {
280          return delegate.getSaslAuthMethod();
281        }
282
283        @Override
284        public String getTokenKind() {
285          return delegate.getTokenKind();
286        }
287      };
288    }
289  }
290
291  @Test
292  public void testRpcFallbackToSimpleAuth() throws Exception {
293    String clientUsername = "testuser";
294    UserGroupInformation clientUgi =
295      UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });
296
297    // check that the client user is insecure
298    assertNotSame(ugi, clientUgi);
299    assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
300    assertEquals(clientUsername, clientUgi.getUserName());
301
302    clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
303    serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
304    callRpcService(User.create(clientUgi));
305  }
306
307  private void setRpcProtection(String clientProtection, String serverProtection) {
308    clientConf.set("hbase.rpc.protection", clientProtection);
309    serverConf.set("hbase.rpc.protection", serverProtection);
310  }
311
312  /**
313   * Test various combinations of Server and Client qops. n
314   */
315  @Test
316  public void testSaslWithCommonQop() throws Exception {
317    setRpcProtection("privacy,authentication", "authentication");
318    callRpcService(User.create(ugi));
319
320    setRpcProtection("authentication", "privacy,authentication");
321    callRpcService(User.create(ugi));
322
323    setRpcProtection("integrity,authentication", "privacy,authentication");
324    callRpcService(User.create(ugi));
325
326    setRpcProtection("integrity,authentication", "integrity,authentication");
327    callRpcService(User.create(ugi));
328
329    setRpcProtection("privacy,authentication", "privacy,authentication");
330    callRpcService(User.create(ugi));
331  }
332
333  @Test
334  public void testSaslNoCommonQop() throws Exception {
335    setRpcProtection("integrity", "privacy");
336    SaslException se = assertThrows(SaslException.class, () -> callRpcService(User.create(ugi)));
337    assertEquals("No common protection layer between client and server", se.getMessage());
338  }
339
340  /**
341   * Test sasl encryption with Crypto AES. n
342   */
343  @Test
344  public void testSaslWithCryptoAES() throws Exception {
345    setRpcProtection("privacy", "privacy");
346    setCryptoAES("true", "true");
347    callRpcService(User.create(ugi));
348  }
349
350  /**
351   * Test various combinations of Server and Client configuration for Crypto AES. n
352   */
353  @Test
354  public void testDifferentConfWithCryptoAES() throws Exception {
355    setRpcProtection("privacy", "privacy");
356
357    setCryptoAES("false", "true");
358    callRpcService(User.create(ugi));
359
360    setCryptoAES("true", "false");
361    try {
362      callRpcService(User.create(ugi));
363      fail("The exception should be thrown out for the rpc timeout.");
364    } catch (Exception e) {
365      // ignore the expected exception
366    }
367  }
368
369  void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
370    clientConf.set("hbase.rpc.crypto.encryption.aes.enabled", clientCryptoAES);
371    serverConf.set("hbase.rpc.crypto.encryption.aes.enabled", serverCryptoAES);
372  }
373
374  private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
375    throws Exception {
376    Configuration cnf = new Configuration();
377    cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
378    UserGroupInformation.setConfiguration(cnf);
379    UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
380    return UserGroupInformation.getLoginUser();
381  }
382
383  /**
384   * Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
385   * the stub, this function will throw root cause of that exception.
386   */
387  private void callRpcService(User clientUser) throws Exception {
388    SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
389    Mockito.when(securityInfoMock.getServerPrincipal())
390      .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
391    SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
392
393    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
394
395    RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
396      Lists
397        .newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
398      isa, serverConf, new FifoRpcScheduler(serverConf, 1));
399    rpcServer.start();
400    try (RpcClient rpcClient =
401      RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
402      BlockingInterface stub =
403        newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
404      TestThread th1 = new TestThread(stub);
405      final Throwable exception[] = new Throwable[1];
406      Collections.synchronizedList(new ArrayList<Throwable>());
407      Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
408        @Override
409        public void uncaughtException(Thread th, Throwable ex) {
410          exception[0] = ex;
411        }
412      };
413      th1.setUncaughtExceptionHandler(exceptionHandler);
414      th1.start();
415      th1.join();
416      if (exception[0] != null) {
417        // throw root cause.
418        while (exception[0].getCause() != null) {
419          exception[0] = exception[0].getCause();
420        }
421        throw (Exception) exception[0];
422      }
423    } finally {
424      rpcServer.stop();
425    }
426  }
427
428  public static class TestThread extends Thread {
429    private final BlockingInterface stub;
430
431    public TestThread(BlockingInterface stub) {
432      this.stub = stub;
433    }
434
435    @Override
436    public void run() {
437      try {
438        int[] messageSize = new int[] { 100, 1000, 10000 };
439        for (int i = 0; i < messageSize.length; i++) {
440          String input = RandomStringUtils.random(messageSize[i]);
441          String result =
442            stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build())
443              .getMessage();
444          assertEquals(input, result);
445        }
446      } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
447        throw new RuntimeException(e);
448      }
449    }
450  }
451}