001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.containsString; 022import static org.hamcrest.Matchers.either; 023import static org.hamcrest.Matchers.instanceOf; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertThrows; 026 027import java.io.File; 028import java.io.IOException; 029import java.lang.reflect.UndeclaredThrowableException; 030import java.net.InetSocketAddress; 031import java.security.PrivilegedExceptionAction; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.List; 035import java.util.stream.Stream; 036import javax.security.sasl.SaslException; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.CommonConfigurationKeys; 039import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.security.SecurityInfo; 044import org.apache.hadoop.hbase.security.User; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.SecurityTests; 047import org.apache.hadoop.ipc.RemoteException; 048import org.apache.hadoop.minikdc.MiniKdc; 049import org.apache.hadoop.security.UserGroupInformation; 050import org.junit.jupiter.api.AfterAll; 051import org.junit.jupiter.api.AfterEach; 052import org.junit.jupiter.api.BeforeAll; 053import org.junit.jupiter.api.BeforeEach; 054import org.junit.jupiter.api.Tag; 055import org.junit.jupiter.api.TestTemplate; 056import org.junit.jupiter.params.provider.Arguments; 057 058import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 059import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 060import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 061import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 062import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 063import org.apache.hbase.thirdparty.io.netty.handler.codec.DecoderException; 064 065import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; 066import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; 068 069/** 070 * Tests for HBASE-28321, where we have multiple server principals candidates for a rpc service. 071 * <p> 072 * Put here just because we need to visit some package private classes under this package. 073 */ 074@Tag(SecurityTests.TAG) 075@Tag(MediumTests.TAG) 076@HBaseParameterizedTestTemplate(name = "{index}: rpcServerImpl={0}, rpcClientImpl={1}") 077public class TestMultipleServerPrincipalsIPC { 078 079 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 080 081 private static final File KEYTAB_FILE = 082 new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); 083 084 private static MiniKdc KDC; 085 private static String HOST = "localhost"; 086 private static String SERVER_PRINCIPAL; 087 private static String SERVER_PRINCIPAL2; 088 private static String CLIENT_PRINCIPAL; 089 090 private final Class<? extends RpcServer> rpcServerImpl; 091 private final Class<? extends RpcClient> rpcClientImpl; 092 093 private Configuration clientConf; 094 private Configuration serverConf; 095 private UserGroupInformation clientUGI; 096 private UserGroupInformation serverUGI; 097 private RpcServer rpcServer; 098 private RpcClient rpcClient; 099 100 public TestMultipleServerPrincipalsIPC(Class<? extends RpcServer> rpcServerImpl, 101 Class<? extends RpcClient> rpcClientImpl) { 102 this.rpcServerImpl = rpcServerImpl; 103 this.rpcClientImpl = rpcClientImpl; 104 } 105 106 public static Stream<Arguments> parameters() { 107 List<Arguments> params = new ArrayList<>(); 108 List<Class<? extends RpcServer>> rpcServerImpls = 109 Arrays.asList(NettyRpcServer.class, SimpleRpcServer.class); 110 List<Class<? extends RpcClient>> rpcClientImpls = 111 Arrays.asList(NettyRpcClient.class, BlockingRpcClient.class); 112 for (Class<? extends RpcServer> rpcServerImpl : rpcServerImpls) { 113 for (Class<? extends RpcClient> rpcClientImpl : rpcClientImpls) { 114 params.add(Arguments.of(rpcServerImpl, rpcClientImpl)); 115 } 116 } 117 return params.stream(); 118 } 119 120 @BeforeAll 121 public static void setUpBeforeClass() throws Exception { 122 KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); 123 SERVER_PRINCIPAL = "server/" + HOST + "@" + KDC.getRealm(); 124 SERVER_PRINCIPAL2 = "server2/" + HOST + "@" + KDC.getRealm(); 125 CLIENT_PRINCIPAL = "client"; 126 KDC.createPrincipal(KEYTAB_FILE, CLIENT_PRINCIPAL, SERVER_PRINCIPAL, SERVER_PRINCIPAL2); 127 setSecuredConfiguration(TEST_UTIL.getConfiguration()); 128 TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 1); 129 TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 0); 130 TEST_UTIL.getConfiguration().setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 10); 131 } 132 133 @AfterAll 134 public static void tearDownAfterClass() { 135 if (KDC != null) { 136 KDC.stop(); 137 } 138 } 139 140 private static void setSecuredConfiguration(Configuration conf) { 141 conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); 142 conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos"); 143 conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true); 144 } 145 146 private void loginAndStartRpcServer(String principal, int port) throws Exception { 147 UserGroupInformation.setConfiguration(serverConf); 148 serverUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 149 KEYTAB_FILE.getCanonicalPath()); 150 rpcServer = serverUGI.doAs((PrivilegedExceptionAction< 151 RpcServer>) () -> RpcServerFactory.createRpcServer(null, getClass().getSimpleName(), 152 Lists.newArrayList( 153 new RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, null)), 154 new InetSocketAddress(HOST, port), serverConf, new FifoRpcScheduler(serverConf, 1))); 155 rpcServer.start(); 156 } 157 158 @BeforeEach 159 public void setUp() throws Exception { 160 clientConf = new Configuration(TEST_UTIL.getConfiguration()); 161 clientConf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl, 162 RpcClient.class); 163 String serverPrincipalConfigName = "hbase.test.multiple.principal.first"; 164 String serverPrincipalConfigName2 = "hbase.test.multiple.principal.second"; 165 clientConf.set(serverPrincipalConfigName, SERVER_PRINCIPAL); 166 clientConf.set(serverPrincipalConfigName2, SERVER_PRINCIPAL2); 167 serverConf = new Configuration(TEST_UTIL.getConfiguration()); 168 serverConf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, 169 RpcServer.class); 170 SecurityInfo securityInfo = new SecurityInfo(Kind.HBASE_AUTH_TOKEN, serverPrincipalConfigName2, 171 serverPrincipalConfigName); 172 SecurityInfo.addInfo(TestProtobufRpcProto.getDescriptor().getName(), securityInfo); 173 174 UserGroupInformation.setConfiguration(clientConf); 175 clientUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL, 176 KEYTAB_FILE.getCanonicalPath()); 177 loginAndStartRpcServer(SERVER_PRINCIPAL, 0); 178 rpcClient = clientUGI.doAs((PrivilegedExceptionAction<RpcClient>) () -> RpcClientFactory 179 .createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())); 180 } 181 182 @AfterEach 183 public void tearDown() throws IOException { 184 Closeables.close(rpcClient, true); 185 rpcServer.stop(); 186 } 187 188 private String echo(String msg) throws Exception { 189 return clientUGI.doAs((PrivilegedExceptionAction<String>) () -> { 190 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( 191 ServerName.valueOf(HOST, rpcServer.getListenerAddress().getPort(), -1), User.getCurrent(), 192 10000); 193 TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcProto.newBlockingStub(channel); 194 return stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build()) 195 .getMessage(); 196 }); 197 } 198 199 @TestTemplate 200 public void testEcho() throws Exception { 201 String msg = "Hello World"; 202 assertEquals(msg, echo(msg)); 203 } 204 205 @TestTemplate 206 public void testMaliciousServer() throws Exception { 207 // reset the server principals so the principal returned by server does not match 208 SecurityInfo securityInfo = 209 SecurityInfo.getInfo(TestProtobufRpcProto.getDescriptor().getName()); 210 for (int i = 0; i < securityInfo.getServerPrincipals().size(); i++) { 211 clientConf.set(securityInfo.getServerPrincipals().get(i), 212 "valid_server_" + i + "/" + HOST + "@" + KDC.getRealm()); 213 } 214 UndeclaredThrowableException error = 215 assertThrows(UndeclaredThrowableException.class, () -> echo("whatever")); 216 assertThat(error.getCause(), instanceOf(ServiceException.class)); 217 assertThat(error.getCause().getCause(), instanceOf(SaslException.class)); 218 } 219 220 @TestTemplate 221 public void testRememberLastSucceededServerPrincipal() throws Exception { 222 // after this call we will remember the last succeeded server principal 223 assertEquals("a", echo("a")); 224 // shutdown the connection, but does not remove it from pool 225 RpcConnection conn = 226 Iterables.getOnlyElement(((AbstractRpcClient<?>) rpcClient).getConnections().values()); 227 conn.shutdown(); 228 // recreate rpc server with server principal2 229 int port = rpcServer.getListenerAddress().getPort(); 230 rpcServer.stop(); 231 serverUGI.logoutUserFromKeytab(); 232 loginAndStartRpcServer(SERVER_PRINCIPAL2, port); 233 // this time we will still use the remembered server principal, so we will get a sasl exception 234 UndeclaredThrowableException error = 235 assertThrows(UndeclaredThrowableException.class, () -> echo("a")); 236 assertThat(error.getCause(), instanceOf(ServiceException.class)); 237 // created by IPCUtil.wrap, to prepend the server address 238 assertThat(error.getCause().getCause(), instanceOf(IOException.class)); 239 // wraped IPCUtil.toIOE 240 assertThat(error.getCause().getCause().getCause(), instanceOf(IOException.class)); 241 Throwable cause = error.getCause().getCause().getCause().getCause(); 242 // for netty rpc client, it is DecoderException, for blocking rpc client, it is already 243 // RemoteExcetion 244 assertThat(cause, 245 either(instanceOf(DecoderException.class)).or(instanceOf(RemoteException.class))); 246 RemoteException rme; 247 if (!(cause instanceof RemoteException)) { 248 assertThat(cause.getCause(), instanceOf(RemoteException.class)); 249 rme = (RemoteException) cause.getCause(); 250 } else { 251 rme = (RemoteException) cause; 252 } 253 assertEquals(SaslException.class.getName(), rme.getClassName()); 254 // the above failure will clear the remembered server principal, so this time we will get the 255 // correct one. We use retry here just because a failure of sasl negotiation will trigger a 256 // relogin and it may take some time, and for netty based implementation the relogin is async 257 TEST_UTIL.waitFor(10000, () -> { 258 try { 259 echo("a"); 260 } catch (UndeclaredThrowableException e) { 261 Throwable t = e.getCause().getCause(); 262 assertThat(t, instanceOf(IOException.class)); 263 if (!(t instanceof FailedServerException)) { 264 // for netty rpc client 265 assertThat(e.getCause().getMessage(), 266 containsString(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS)); 267 } 268 return false; 269 } 270 return true; 271 }); 272 } 273}