001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
022import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
023import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
024import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertNotSame;
027import static org.junit.Assert.assertSame;
028import static org.junit.Assert.fail;
029
030import java.io.File;
031import java.io.IOException;
032import java.net.InetSocketAddress;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.List;
038import javax.security.sasl.SaslException;
039import org.apache.commons.lang3.RandomStringUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.CommonConfigurationKeys;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
046import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
047import org.apache.hadoop.hbase.ipc.NettyRpcClient;
048import org.apache.hadoop.hbase.ipc.NettyRpcServer;
049import org.apache.hadoop.hbase.ipc.RpcClient;
050import org.apache.hadoop.hbase.ipc.RpcClientFactory;
051import org.apache.hadoop.hbase.ipc.RpcServer;
052import org.apache.hadoop.hbase.ipc.RpcServerFactory;
053import org.apache.hadoop.hbase.ipc.RpcServerInterface;
054import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.testclassification.SecurityTests;
057import org.apache.hadoop.minikdc.MiniKdc;
058import org.apache.hadoop.security.UserGroupInformation;
059import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
060import org.junit.AfterClass;
061import org.junit.Before;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.ExpectedException;
068import org.junit.runner.RunWith;
069import org.junit.runners.Parameterized;
070import org.junit.runners.Parameterized.Parameter;
071import org.junit.runners.Parameterized.Parameters;
072import org.mockito.Mockito;
073
074import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
075import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
076
077import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
078import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
079
080@RunWith(Parameterized.class)
081@Category({ SecurityTests.class, MediumTests.class })
082public class TestSecureIPC {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086      HBaseClassTestRule.forClass(TestSecureIPC.class);
087
088  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
089
090  private static final File KEYTAB_FILE = new File(
091      TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
092
093  private static MiniKdc KDC;
094  private static String HOST = "localhost";
095  private static String PRINCIPAL;
096
097  String krbKeytab;
098  String krbPrincipal;
099  UserGroupInformation ugi;
100  Configuration clientConf;
101  Configuration serverConf;
102
103  @Rule
104  public ExpectedException exception = ExpectedException.none();
105
106  @Parameters(name = "{index}: rpcClientImpl={0}, rpcServerImpl={1}")
107  public static Collection<Object[]> parameters() {
108    List<Object[]> params = new ArrayList<>();
109    List<String> rpcClientImpls = Arrays.asList(
110        BlockingRpcClient.class.getName(), NettyRpcClient.class.getName());
111    List<String> rpcServerImpls = Arrays.asList(
112        SimpleRpcServer.class.getName(), NettyRpcServer.class.getName());
113    for (String rpcClientImpl : rpcClientImpls) {
114      for (String rpcServerImpl : rpcServerImpls) {
115        params.add(new Object[] { rpcClientImpl, rpcServerImpl });
116      }
117    }
118    return params;
119  }
120
121  @Parameter(0)
122  public String rpcClientImpl;
123
124  @Parameter(1)
125  public String rpcServerImpl;
126
127  @BeforeClass
128  public static void setUp() throws Exception {
129    KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
130    PRINCIPAL = "hbase/" + HOST;
131    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
132    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
133  }
134
135  @AfterClass
136  public static void tearDown() throws IOException {
137    if (KDC != null) {
138      KDC.stop();
139    }
140    TEST_UTIL.cleanupTestDir();
141  }
142
143  @Before
144  public void setUpTest() throws Exception {
145    krbKeytab = getKeytabFileForTesting();
146    krbPrincipal = getPrincipalForTesting();
147    ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
148    clientConf = getSecuredConfiguration();
149    clientConf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
150    serverConf = getSecuredConfiguration();
151    serverConf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
152        rpcServerImpl);
153  }
154
155  @Test
156  public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
157    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
158
159    // check that the login user is okay:
160    assertSame(ugi2, ugi);
161    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
162    assertEquals(krbPrincipal, ugi.getUserName());
163
164    callRpcService(User.create(ugi2));
165  }
166
167  @Test
168  public void testRpcFallbackToSimpleAuth() throws Exception {
169    String clientUsername = "testuser";
170    UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
171      new String[] { clientUsername });
172
173    // check that the client user is insecure
174    assertNotSame(ugi, clientUgi);
175    assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
176    assertEquals(clientUsername, clientUgi.getUserName());
177
178    clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
179    serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
180    callRpcService(User.create(clientUgi));
181  }
182
183  void setRpcProtection(String clientProtection, String serverProtection) {
184    clientConf.set("hbase.rpc.protection", clientProtection);
185    serverConf.set("hbase.rpc.protection", serverProtection);
186  }
187
188  /**
189   * Test various combinations of Server and Client qops.
190   * @throws Exception
191   */
192  @Test
193  public void testSaslWithCommonQop() throws Exception {
194    setRpcProtection("privacy,authentication", "authentication");
195    callRpcService(User.create(ugi));
196
197    setRpcProtection("authentication", "privacy,authentication");
198    callRpcService(User.create(ugi));
199
200    setRpcProtection("integrity,authentication", "privacy,authentication");
201    callRpcService(User.create(ugi));
202
203    setRpcProtection("integrity,authentication", "integrity,authentication");
204    callRpcService(User.create(ugi));
205
206    setRpcProtection("privacy,authentication", "privacy,authentication");
207    callRpcService(User.create(ugi));
208  }
209
210  @Test
211  public void testSaslNoCommonQop() throws Exception {
212    exception.expect(SaslException.class);
213    exception.expectMessage("No common protection layer between client and server");
214    setRpcProtection("integrity", "privacy");
215    callRpcService(User.create(ugi));
216  }
217
218  /**
219   * Test sasl encryption with Crypto AES.
220   * @throws Exception
221   */
222  @Test
223  public void testSaslWithCryptoAES() throws Exception {
224    setRpcProtection("privacy", "privacy");
225    setCryptoAES("true", "true");
226    callRpcService(User.create(ugi));
227  }
228
229  /**
230   * Test various combinations of Server and Client configuration for Crypto AES.
231   * @throws Exception
232   */
233  @Test
234  public void testDifferentConfWithCryptoAES() throws Exception {
235    setRpcProtection("privacy", "privacy");
236
237    setCryptoAES("false", "true");
238    callRpcService(User.create(ugi));
239
240    setCryptoAES("true", "false");
241    try {
242      callRpcService(User.create(ugi));
243      fail("The exception should be thrown out for the rpc timeout.");
244    } catch (Exception e) {
245      // ignore the expected exception
246    }
247  }
248
249  void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
250    clientConf.set("hbase.rpc.crypto.encryption.aes.enabled", clientCryptoAES);
251    serverConf.set("hbase.rpc.crypto.encryption.aes.enabled", serverCryptoAES);
252  }
253
254  private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
255      throws Exception {
256    Configuration cnf = new Configuration();
257    cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
258    UserGroupInformation.setConfiguration(cnf);
259    UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
260    return UserGroupInformation.getLoginUser();
261  }
262
263  /**
264   * Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
265   * the stub, this function will throw root cause of that exception.
266   */
267  private void callRpcService(User clientUser) throws Exception {
268    SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
269    Mockito.when(securityInfoMock.getServerPrincipal())
270        .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
271    SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
272
273    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
274
275    RpcServerInterface rpcServer = RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
276        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)), isa,
277        serverConf, new FifoRpcScheduler(serverConf, 1));
278    rpcServer.start();
279    try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
280      HConstants.DEFAULT_CLUSTER_ID.toString())) {
281      BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress(),
282        clientUser);
283      TestThread th1 = new TestThread(stub);
284      final Throwable exception[] = new Throwable[1];
285      Collections.synchronizedList(new ArrayList<Throwable>());
286      Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
287        @Override
288        public void uncaughtException(Thread th, Throwable ex) {
289          exception[0] = ex;
290        }
291      };
292      th1.setUncaughtExceptionHandler(exceptionHandler);
293      th1.start();
294      th1.join();
295      if (exception[0] != null) {
296        // throw root cause.
297        while (exception[0].getCause() != null) {
298          exception[0] = exception[0].getCause();
299        }
300        throw (Exception) exception[0];
301      }
302    } finally {
303      rpcServer.stop();
304    }
305  }
306
307  public static class TestThread extends Thread {
308    private final BlockingInterface stub;
309
310    public TestThread(BlockingInterface stub) {
311      this.stub = stub;
312    }
313
314    @Override
315    public void run() {
316      try {
317        int[] messageSize = new int[] { 100, 1000, 10000 };
318        for (int i = 0; i < messageSize.length; i++) {
319          String input = RandomStringUtils.random(messageSize[i]);
320          String result = stub
321              .echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build())
322              .getMessage();
323          assertEquals(input, result);
324        }
325      } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
326        throw new RuntimeException(e);
327      }
328    }
329  }
330}