001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.containsString; 022import static org.hamcrest.Matchers.either; 023import static org.hamcrest.Matchers.instanceOf; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertThrows; 026 027import java.io.File; 028import java.io.IOException; 029import java.lang.reflect.UndeclaredThrowableException; 030import java.net.InetSocketAddress; 031import java.security.PrivilegedExceptionAction; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.List; 035import javax.security.sasl.SaslException; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.CommonConfigurationKeys; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.security.SecurityInfo; 043import org.apache.hadoop.hbase.security.User; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.testclassification.SecurityTests; 046import org.apache.hadoop.ipc.RemoteException; 047import org.apache.hadoop.minikdc.MiniKdc; 048import org.apache.hadoop.security.UserGroupInformation; 049import org.junit.After; 050import org.junit.AfterClass; 051import org.junit.Before; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.runner.RunWith; 057import org.junit.runners.Parameterized; 058import org.junit.runners.Parameterized.Parameter; 059import org.junit.runners.Parameterized.Parameters; 060 061import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 062import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 063import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 064import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 065import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 066import org.apache.hbase.thirdparty.io.netty.handler.codec.DecoderException; 067 068import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; 069import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; 071 072/** 073 * Tests for HBASE-28321, where we have multiple server principals candidates for a rpc service. 074 * <p> 075 * Put here just because we need to visit some package private classes under this package. 076 */ 077@RunWith(Parameterized.class) 078@Category({ SecurityTests.class, MediumTests.class }) 079public class TestMultipleServerPrincipalsIPC { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestMultipleServerPrincipalsIPC.class); 084 085 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 086 087 private static final File KEYTAB_FILE = 088 new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); 089 090 private static MiniKdc KDC; 091 private static String HOST = "localhost"; 092 private static String SERVER_PRINCIPAL; 093 private static String SERVER_PRINCIPAL2; 094 private static String CLIENT_PRINCIPAL; 095 096 @Parameter(0) 097 public Class<? extends RpcServer> rpcServerImpl; 098 099 @Parameter(1) 100 public Class<? extends RpcClient> rpcClientImpl; 101 102 private Configuration clientConf; 103 private Configuration serverConf; 104 private UserGroupInformation clientUGI; 105 private UserGroupInformation serverUGI; 106 private RpcServer rpcServer; 107 private RpcClient rpcClient; 108 109 @Parameters(name = "{index}: rpcServerImpl={0}, rpcClientImpl={1}") 110 public static List<Object[]> params() { 111 List<Object[]> params = new ArrayList<>(); 112 List<Class<? extends RpcServer>> rpcServerImpls = 113 Arrays.asList(NettyRpcServer.class, SimpleRpcServer.class); 114 List<Class<? extends RpcClient>> rpcClientImpls = 115 Arrays.asList(NettyRpcClient.class, BlockingRpcClient.class); 116 for (Class<? extends RpcServer> rpcServerImpl : rpcServerImpls) { 117 for (Class<? extends RpcClient> rpcClientImpl : rpcClientImpls) { 118 params.add(new Object[] { rpcServerImpl, rpcClientImpl }); 119 } 120 } 121 return params; 122 } 123 124 @BeforeClass 125 public static void setUpBeforeClass() throws Exception { 126 KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); 127 SERVER_PRINCIPAL = "server/" + HOST + "@" + KDC.getRealm(); 128 SERVER_PRINCIPAL2 = "server2/" + HOST + "@" + KDC.getRealm(); 129 CLIENT_PRINCIPAL = "client"; 130 KDC.createPrincipal(KEYTAB_FILE, CLIENT_PRINCIPAL, SERVER_PRINCIPAL, SERVER_PRINCIPAL2); 131 setSecuredConfiguration(TEST_UTIL.getConfiguration()); 132 TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 1); 133 TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 0); 134 TEST_UTIL.getConfiguration().setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 10); 135 } 136 137 @AfterClass 138 public static void tearDownAfterClass() { 139 if (KDC != null) { 140 KDC.stop(); 141 } 142 } 143 144 private static void setSecuredConfiguration(Configuration conf) { 145 conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); 146 conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos"); 147 conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true); 148 } 149 150 private void loginAndStartRpcServer(String principal, int port) throws Exception { 151 UserGroupInformation.setConfiguration(serverConf); 152 serverUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 153 KEYTAB_FILE.getCanonicalPath()); 154 rpcServer = serverUGI.doAs((PrivilegedExceptionAction< 155 RpcServer>) () -> RpcServerFactory.createRpcServer(null, getClass().getSimpleName(), 156 Lists.newArrayList( 157 new RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, null)), 158 new InetSocketAddress(HOST, port), serverConf, new FifoRpcScheduler(serverConf, 1))); 159 rpcServer.start(); 160 } 161 162 @Before 163 public void setUp() throws Exception { 164 clientConf = new Configuration(TEST_UTIL.getConfiguration()); 165 clientConf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl, 166 RpcClient.class); 167 String serverPrincipalConfigName = "hbase.test.multiple.principal.first"; 168 String serverPrincipalConfigName2 = "hbase.test.multiple.principal.second"; 169 clientConf.set(serverPrincipalConfigName, SERVER_PRINCIPAL); 170 clientConf.set(serverPrincipalConfigName2, SERVER_PRINCIPAL2); 171 serverConf = new Configuration(TEST_UTIL.getConfiguration()); 172 serverConf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, 173 RpcServer.class); 174 SecurityInfo securityInfo = new SecurityInfo(Kind.HBASE_AUTH_TOKEN, serverPrincipalConfigName2, 175 serverPrincipalConfigName); 176 SecurityInfo.addInfo(TestProtobufRpcProto.getDescriptor().getName(), securityInfo); 177 178 UserGroupInformation.setConfiguration(clientConf); 179 clientUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL, 180 KEYTAB_FILE.getCanonicalPath()); 181 loginAndStartRpcServer(SERVER_PRINCIPAL, 0); 182 rpcClient = clientUGI.doAs((PrivilegedExceptionAction<RpcClient>) () -> RpcClientFactory 183 .createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())); 184 } 185 186 @After 187 public void tearDown() throws IOException { 188 Closeables.close(rpcClient, true); 189 rpcServer.stop(); 190 } 191 192 private String echo(String msg) throws Exception { 193 return clientUGI.doAs((PrivilegedExceptionAction<String>) () -> { 194 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( 195 ServerName.valueOf(HOST, rpcServer.getListenerAddress().getPort(), -1), User.getCurrent(), 196 10000); 197 TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcProto.newBlockingStub(channel); 198 return stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build()) 199 .getMessage(); 200 }); 201 } 202 203 @Test 204 public void testEcho() throws Exception { 205 String msg = "Hello World"; 206 assertEquals(msg, echo(msg)); 207 } 208 209 @Test 210 public void testMaliciousServer() throws Exception { 211 // reset the server principals so the principal returned by server does not match 212 SecurityInfo securityInfo = 213 SecurityInfo.getInfo(TestProtobufRpcProto.getDescriptor().getName()); 214 for (int i = 0; i < securityInfo.getServerPrincipals().size(); i++) { 215 clientConf.set(securityInfo.getServerPrincipals().get(i), 216 "valid_server_" + i + "/" + HOST + "@" + KDC.getRealm()); 217 } 218 UndeclaredThrowableException error = 219 assertThrows(UndeclaredThrowableException.class, () -> echo("whatever")); 220 assertThat(error.getCause(), instanceOf(ServiceException.class)); 221 assertThat(error.getCause().getCause(), instanceOf(SaslException.class)); 222 } 223 224 @Test 225 public void testRememberLastSucceededServerPrincipal() throws Exception { 226 // after this call we will remember the last succeeded server principal 227 assertEquals("a", echo("a")); 228 // shutdown the connection, but does not remove it from pool 229 RpcConnection conn = 230 Iterables.getOnlyElement(((AbstractRpcClient<?>) rpcClient).getConnections().values()); 231 conn.shutdown(); 232 // recreate rpc server with server principal2 233 int port = rpcServer.getListenerAddress().getPort(); 234 rpcServer.stop(); 235 serverUGI.logoutUserFromKeytab(); 236 loginAndStartRpcServer(SERVER_PRINCIPAL2, port); 237 // this time we will still use the remembered server principal, so we will get a sasl exception 238 UndeclaredThrowableException error = 239 assertThrows(UndeclaredThrowableException.class, () -> echo("a")); 240 assertThat(error.getCause(), instanceOf(ServiceException.class)); 241 // created by IPCUtil.wrap, to prepend the server address 242 assertThat(error.getCause().getCause(), instanceOf(IOException.class)); 243 // wraped IPCUtil.toIOE 244 assertThat(error.getCause().getCause().getCause(), instanceOf(IOException.class)); 245 Throwable cause = error.getCause().getCause().getCause().getCause(); 246 // for netty rpc client, it is DecoderException, for blocking rpc client, it is already 247 // RemoteExcetion 248 assertThat(cause, 249 either(instanceOf(DecoderException.class)).or(instanceOf(RemoteException.class))); 250 RemoteException rme; 251 if (!(cause instanceof RemoteException)) { 252 assertThat(cause.getCause(), instanceOf(RemoteException.class)); 253 rme = (RemoteException) cause.getCause(); 254 } else { 255 rme = (RemoteException) cause; 256 } 257 assertEquals(SaslException.class.getName(), rme.getClassName()); 258 // the above failure will clear the remembered server principal, so this time we will get the 259 // correct one. We use retry here just because a failure of sasl negotiation will trigger a 260 // relogin and it may take some time, and for netty based implementation the relogin is async 261 TEST_UTIL.waitFor(10000, () -> { 262 try { 263 echo("a"); 264 } catch (UndeclaredThrowableException e) { 265 Throwable t = e.getCause().getCause(); 266 assertThat(t, instanceOf(IOException.class)); 267 if (!(t instanceof FailedServerException)) { 268 // for netty rpc client 269 assertThat(e.getCause().getMessage(), 270 containsString(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS)); 271 } 272 return false; 273 } 274 return true; 275 }); 276 } 277}