001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.containsString;
022import static org.hamcrest.Matchers.either;
023import static org.hamcrest.Matchers.instanceOf;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertThrows;
026
027import java.io.File;
028import java.io.IOException;
029import java.lang.reflect.UndeclaredThrowableException;
030import java.net.InetSocketAddress;
031import java.security.PrivilegedExceptionAction;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.List;
035import java.util.stream.Stream;
036import javax.security.sasl.SaslException;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.CommonConfigurationKeys;
039import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.security.SecurityInfo;
044import org.apache.hadoop.hbase.security.User;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.SecurityTests;
047import org.apache.hadoop.ipc.RemoteException;
048import org.apache.hadoop.minikdc.MiniKdc;
049import org.apache.hadoop.security.UserGroupInformation;
050import org.junit.jupiter.api.AfterAll;
051import org.junit.jupiter.api.AfterEach;
052import org.junit.jupiter.api.BeforeAll;
053import org.junit.jupiter.api.BeforeEach;
054import org.junit.jupiter.api.Tag;
055import org.junit.jupiter.api.TestTemplate;
056import org.junit.jupiter.params.provider.Arguments;
057
058import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
059import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
060import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
061import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
062import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
063import org.apache.hbase.thirdparty.io.netty.handler.codec.DecoderException;
064
065import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
066import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
068
069/**
070 * Tests for HBASE-28321, where we have multiple server principals candidates for a rpc service.
071 * <p>
072 * Put here just because we need to visit some package private classes under this package.
073 */
074@Tag(SecurityTests.TAG)
075@Tag(MediumTests.TAG)
076@HBaseParameterizedTestTemplate(name = "{index}: rpcServerImpl={0}, rpcClientImpl={1}")
077public class TestMultipleServerPrincipalsIPC {
078
079  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
080
081  private static final File KEYTAB_FILE =
082    new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
083
084  private static MiniKdc KDC;
085  private static String HOST = "localhost";
086  private static String SERVER_PRINCIPAL;
087  private static String SERVER_PRINCIPAL2;
088  private static String CLIENT_PRINCIPAL;
089
090  private final Class<? extends RpcServer> rpcServerImpl;
091  private final Class<? extends RpcClient> rpcClientImpl;
092
093  private Configuration clientConf;
094  private Configuration serverConf;
095  private UserGroupInformation clientUGI;
096  private UserGroupInformation serverUGI;
097  private RpcServer rpcServer;
098  private RpcClient rpcClient;
099
100  public TestMultipleServerPrincipalsIPC(Class<? extends RpcServer> rpcServerImpl,
101    Class<? extends RpcClient> rpcClientImpl) {
102    this.rpcServerImpl = rpcServerImpl;
103    this.rpcClientImpl = rpcClientImpl;
104  }
105
106  public static Stream<Arguments> parameters() {
107    List<Arguments> params = new ArrayList<>();
108    List<Class<? extends RpcServer>> rpcServerImpls =
109      Arrays.asList(NettyRpcServer.class, SimpleRpcServer.class);
110    List<Class<? extends RpcClient>> rpcClientImpls =
111      Arrays.asList(NettyRpcClient.class, BlockingRpcClient.class);
112    for (Class<? extends RpcServer> rpcServerImpl : rpcServerImpls) {
113      for (Class<? extends RpcClient> rpcClientImpl : rpcClientImpls) {
114        params.add(Arguments.of(rpcServerImpl, rpcClientImpl));
115      }
116    }
117    return params.stream();
118  }
119
120  @BeforeAll
121  public static void setUpBeforeClass() throws Exception {
122    KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
123    SERVER_PRINCIPAL = "server/" + HOST + "@" + KDC.getRealm();
124    SERVER_PRINCIPAL2 = "server2/" + HOST + "@" + KDC.getRealm();
125    CLIENT_PRINCIPAL = "client";
126    KDC.createPrincipal(KEYTAB_FILE, CLIENT_PRINCIPAL, SERVER_PRINCIPAL, SERVER_PRINCIPAL2);
127    setSecuredConfiguration(TEST_UTIL.getConfiguration());
128    TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 1);
129    TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 0);
130    TEST_UTIL.getConfiguration().setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 10);
131  }
132
133  @AfterAll
134  public static void tearDownAfterClass() {
135    if (KDC != null) {
136      KDC.stop();
137    }
138  }
139
140  private static void setSecuredConfiguration(Configuration conf) {
141    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
142    conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos");
143    conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
144  }
145
146  private void loginAndStartRpcServer(String principal, int port) throws Exception {
147    UserGroupInformation.setConfiguration(serverConf);
148    serverUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal,
149      KEYTAB_FILE.getCanonicalPath());
150    rpcServer = serverUGI.doAs((PrivilegedExceptionAction<
151      RpcServer>) () -> RpcServerFactory.createRpcServer(null, getClass().getSimpleName(),
152        Lists.newArrayList(
153          new RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, null)),
154        new InetSocketAddress(HOST, port), serverConf, new FifoRpcScheduler(serverConf, 1)));
155    rpcServer.start();
156  }
157
158  @BeforeEach
159  public void setUp() throws Exception {
160    clientConf = new Configuration(TEST_UTIL.getConfiguration());
161    clientConf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl,
162      RpcClient.class);
163    String serverPrincipalConfigName = "hbase.test.multiple.principal.first";
164    String serverPrincipalConfigName2 = "hbase.test.multiple.principal.second";
165    clientConf.set(serverPrincipalConfigName, SERVER_PRINCIPAL);
166    clientConf.set(serverPrincipalConfigName2, SERVER_PRINCIPAL2);
167    serverConf = new Configuration(TEST_UTIL.getConfiguration());
168    serverConf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl,
169      RpcServer.class);
170    SecurityInfo securityInfo = new SecurityInfo(Kind.HBASE_AUTH_TOKEN, serverPrincipalConfigName2,
171      serverPrincipalConfigName);
172    SecurityInfo.addInfo(TestProtobufRpcProto.getDescriptor().getName(), securityInfo);
173
174    UserGroupInformation.setConfiguration(clientConf);
175    clientUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL,
176      KEYTAB_FILE.getCanonicalPath());
177    loginAndStartRpcServer(SERVER_PRINCIPAL, 0);
178    rpcClient = clientUGI.doAs((PrivilegedExceptionAction<RpcClient>) () -> RpcClientFactory
179      .createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString()));
180  }
181
182  @AfterEach
183  public void tearDown() throws IOException {
184    Closeables.close(rpcClient, true);
185    rpcServer.stop();
186  }
187
188  private String echo(String msg) throws Exception {
189    return clientUGI.doAs((PrivilegedExceptionAction<String>) () -> {
190      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
191        ServerName.valueOf(HOST, rpcServer.getListenerAddress().getPort(), -1), User.getCurrent(),
192        10000);
193      TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcProto.newBlockingStub(channel);
194      return stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build())
195        .getMessage();
196    });
197  }
198
199  @TestTemplate
200  public void testEcho() throws Exception {
201    String msg = "Hello World";
202    assertEquals(msg, echo(msg));
203  }
204
205  @TestTemplate
206  public void testMaliciousServer() throws Exception {
207    // reset the server principals so the principal returned by server does not match
208    SecurityInfo securityInfo =
209      SecurityInfo.getInfo(TestProtobufRpcProto.getDescriptor().getName());
210    for (int i = 0; i < securityInfo.getServerPrincipals().size(); i++) {
211      clientConf.set(securityInfo.getServerPrincipals().get(i),
212        "valid_server_" + i + "/" + HOST + "@" + KDC.getRealm());
213    }
214    UndeclaredThrowableException error =
215      assertThrows(UndeclaredThrowableException.class, () -> echo("whatever"));
216    assertThat(error.getCause(), instanceOf(ServiceException.class));
217    assertThat(error.getCause().getCause(), instanceOf(SaslException.class));
218  }
219
220  @TestTemplate
221  public void testRememberLastSucceededServerPrincipal() throws Exception {
222    // after this call we will remember the last succeeded server principal
223    assertEquals("a", echo("a"));
224    // shutdown the connection, but does not remove it from pool
225    RpcConnection conn =
226      Iterables.getOnlyElement(((AbstractRpcClient<?>) rpcClient).getConnections().values());
227    conn.shutdown();
228    // recreate rpc server with server principal2
229    int port = rpcServer.getListenerAddress().getPort();
230    rpcServer.stop();
231    serverUGI.logoutUserFromKeytab();
232    loginAndStartRpcServer(SERVER_PRINCIPAL2, port);
233    // this time we will still use the remembered server principal, so we will get a sasl exception
234    UndeclaredThrowableException error =
235      assertThrows(UndeclaredThrowableException.class, () -> echo("a"));
236    assertThat(error.getCause(), instanceOf(ServiceException.class));
237    // created by IPCUtil.wrap, to prepend the server address
238    assertThat(error.getCause().getCause(), instanceOf(IOException.class));
239    // wraped IPCUtil.toIOE
240    assertThat(error.getCause().getCause().getCause(), instanceOf(IOException.class));
241    Throwable cause = error.getCause().getCause().getCause().getCause();
242    // for netty rpc client, it is DecoderException, for blocking rpc client, it is already
243    // RemoteExcetion
244    assertThat(cause,
245      either(instanceOf(DecoderException.class)).or(instanceOf(RemoteException.class)));
246    RemoteException rme;
247    if (!(cause instanceof RemoteException)) {
248      assertThat(cause.getCause(), instanceOf(RemoteException.class));
249      rme = (RemoteException) cause.getCause();
250    } else {
251      rme = (RemoteException) cause;
252    }
253    assertEquals(SaslException.class.getName(), rme.getClassName());
254    // the above failure will clear the remembered server principal, so this time we will get the
255    // correct one. We use retry here just because a failure of sasl negotiation will trigger a
256    // relogin and it may take some time, and for netty based implementation the relogin is async
257    TEST_UTIL.waitFor(10000, () -> {
258      try {
259        echo("a");
260      } catch (UndeclaredThrowableException e) {
261        Throwable t = e.getCause().getCause();
262        assertThat(t, instanceOf(IOException.class));
263        if (!(t instanceof FailedServerException)) {
264          // for netty rpc client
265          assertThat(e.getCause().getMessage(),
266            containsString(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS));
267        }
268        return false;
269      }
270      return true;
271    });
272  }
273}