001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.DataInput; 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.net.UnknownHostException; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.Map; 031import java.util.Set; 032import java.util.TreeSet; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.TimeUnit; 035import java.util.function.Consumer; 036import javax.security.sasl.SaslException; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.ExtendedCellScanner; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.client.MetricsConnection; 041import org.apache.hadoop.hbase.codec.Codec; 042import org.apache.hadoop.hbase.net.Address; 043import org.apache.hadoop.hbase.security.AuthMethod; 044import org.apache.hadoop.hbase.security.SecurityConstants; 045import org.apache.hadoop.hbase.security.SecurityInfo; 046import org.apache.hadoop.hbase.security.User; 047import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; 048import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.Pair; 052import org.apache.hadoop.io.compress.CompressionCodec; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.hadoop.security.SecurityUtil; 055import org.apache.hadoop.security.token.Token; 056import org.apache.hadoop.security.token.TokenIdentifier; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.protobuf.Message; 062import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 063import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 064import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 065import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 066import org.apache.hbase.thirdparty.io.netty.util.Timeout; 067import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 068 069import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 076 077/** 078 * Base class for ipc connection. 079 */ 080@InterfaceAudience.Private 081abstract class RpcConnection { 082 083 private static final Logger LOG = LoggerFactory.getLogger(RpcConnection.class); 084 085 protected final ConnectionId remoteId; 086 087 protected final boolean useSasl; 088 089 protected final Token<? extends TokenIdentifier> token; 090 091 protected final SecurityInfo securityInfo; 092 093 protected final int reloginMaxBackoff; // max pause before relogin on sasl failure 094 095 protected final Codec codec; 096 097 protected final CompressionCodec compressor; 098 099 protected final CellBlockBuilder cellBlockBuilder; 100 101 protected final MetricsConnection metrics; 102 private final Map<String, byte[]> connectionAttributes; 103 104 protected final HashedWheelTimer timeoutTimer; 105 106 protected final Configuration conf; 107 108 protected static String CRYPTO_AES_ENABLED_KEY = "hbase.rpc.crypto.encryption.aes.enabled"; 109 110 protected static boolean CRYPTO_AES_ENABLED_DEFAULT = false; 111 112 // the last time we were picked up from connection pool. 113 protected long lastTouched; 114 115 protected SaslClientAuthenticationProvider provider; 116 117 // Record the server principal which we have successfully authenticated with the remote server 118 // this is used to save the extra round trip with server when there are multiple candidate server 119 // principals for a given rpc service, like ClientMetaService. 120 // See HBASE-28321 for more details. 121 private String lastSucceededServerPrincipal; 122 123 protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, 124 String clusterId, boolean isSecurityEnabled, SaslClientAuthenticationProviders providers, 125 Codec codec, CompressionCodec compressor, CellBlockBuilder cellBlockBuilder, 126 MetricsConnection metrics, Map<String, byte[]> connectionAttributes) throws IOException { 127 this.timeoutTimer = timeoutTimer; 128 this.codec = codec; 129 this.compressor = compressor; 130 this.cellBlockBuilder = cellBlockBuilder; 131 this.conf = conf; 132 this.metrics = metrics; 133 this.connectionAttributes = connectionAttributes; 134 User ticket = remoteId.getTicket(); 135 this.securityInfo = SecurityInfo.getInfo(remoteId.getServiceName()); 136 this.useSasl = isSecurityEnabled; 137 138 // Choose the correct Token and AuthenticationProvider for this client to use 139 Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair; 140 if (useSasl && securityInfo != null) { 141 pair = providers.selectProvider(clusterId, ticket); 142 if (pair == null) { 143 if (LOG.isTraceEnabled()) { 144 LOG.trace("Found no valid authentication method from providers={} with tokens={}", 145 providers.toString(), ticket.getTokens()); 146 } 147 throw new RuntimeException("Found no valid authentication method from options"); 148 } 149 } else if (!useSasl) { 150 // Hack, while SIMPLE doesn't go via SASL. 151 pair = providers.getSimpleProvider(); 152 } else { 153 throw new RuntimeException("Could not compute valid client authentication provider"); 154 } 155 156 this.provider = pair.getFirst(); 157 this.token = pair.getSecond(); 158 159 LOG.debug("Using {} authentication for service={}, sasl={}", 160 provider.getSaslAuthMethod().getName(), remoteId.serviceName, useSasl); 161 reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000); 162 this.remoteId = remoteId; 163 } 164 165 protected final void scheduleTimeoutTask(final Call call) { 166 if (call.timeout > 0) { 167 call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() { 168 169 @Override 170 public void run(Timeout timeout) throws Exception { 171 call.setTimeout(new CallTimeoutException(call.toShortString() + ", waitTime=" 172 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + "ms, rpcTimeout=" 173 + call.timeout + "ms")); 174 callTimeout(call); 175 } 176 }, call.timeout, TimeUnit.MILLISECONDS); 177 } 178 } 179 180 // will be overridden in tests 181 protected byte[] getConnectionHeaderPreamble() { 182 // Assemble the preamble up in a buffer first and then send it. Writing individual elements, 183 // they are getting sent across piecemeal according to wireshark and then server is messing 184 // up the reading on occasion (the passed in stream is not buffered yet). 185 int rpcHeaderLen = HConstants.RPC_HEADER.length; 186 // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE 187 byte[] preamble = new byte[rpcHeaderLen + 2]; 188 System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen); 189 preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION; 190 synchronized (this) { 191 preamble[preamble.length - 1] = provider.getSaslAuthMethod().getCode(); 192 } 193 return preamble; 194 } 195 196 private Map<String, byte[]> getConfigurationConnectionAttributes() { 197 Map<String, byte[]> attributes = new HashMap<>(); 198 199 for (Map.Entry<String, String> entry : conf) { 200 String key = entry.getKey(); 201 202 if ( 203 key.startsWith(HConstants.CLIENT_HEADER_PREFIX) && !connectionAttributes.containsKey(key) 204 ) { 205 String value = entry.getValue(); 206 207 LOG.debug("Adding connection header: {}={}", key, value); 208 attributes.put(key, Bytes.toBytes(value)); 209 } 210 } 211 212 return attributes; 213 } 214 215 protected final ConnectionHeader getConnectionHeader() { 216 final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); 217 builder.setServiceName(remoteId.getServiceName()); 218 final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket); 219 if (userInfoPB != null) { 220 builder.setUserInfo(userInfoPB); 221 } 222 if (this.codec != null) { 223 builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName()); 224 } 225 if (this.compressor != null) { 226 builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName()); 227 } 228 if (connectionAttributes != null && !connectionAttributes.isEmpty()) { 229 HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); 230 for (Map.Entry<String, byte[]> attribute : connectionAttributes.entrySet()) { 231 attributeBuilder.setName(attribute.getKey()); 232 attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); 233 builder.addAttribute(attributeBuilder.build()); 234 } 235 } 236 for (Map.Entry<String, byte[]> attribute : getConfigurationConnectionAttributes().entrySet()) { 237 HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); 238 attributeBuilder.setName(attribute.getKey()); 239 attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); 240 builder.addAttribute(attributeBuilder.build()); 241 } 242 builder.setVersionInfo(ProtobufUtil.getVersionInfo()); 243 boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT); 244 // if Crypto AES enable, setup Cipher transformation 245 if (isCryptoAESEnable) { 246 builder.setRpcCryptoCipherTransformation( 247 conf.get("hbase.rpc.crypto.encryption.aes.cipher.transform", "AES/CTR/NoPadding")); 248 } 249 return builder.build(); 250 } 251 252 protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics) 253 throws UnknownHostException { 254 if (metrics != null) { 255 metrics.incrNsLookups(); 256 } 257 InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); 258 if (remoteAddr.isUnresolved()) { 259 if (metrics != null) { 260 metrics.incrNsLookupsFailed(); 261 } 262 throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); 263 } 264 return remoteAddr; 265 } 266 267 private static boolean useCanonicalHostname(Configuration conf) { 268 return !conf.getBoolean( 269 SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, 270 SecurityConstants.DEFAULT_UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS); 271 } 272 273 private static String getHostnameForServerPrincipal(Configuration conf, InetAddress addr) { 274 final String hostname; 275 if (useCanonicalHostname(conf)) { 276 hostname = addr.getCanonicalHostName(); 277 if (hostname.equals(addr.getHostAddress())) { 278 LOG.warn("Canonical hostname for SASL principal is the same with IP address: " + hostname 279 + ", " + addr.getHostName() + ". Check DNS configuration or consider " 280 + SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS + "=true"); 281 } 282 } else { 283 hostname = addr.getHostName(); 284 } 285 286 return hostname.toLowerCase(); 287 } 288 289 private static String getServerPrincipal(Configuration conf, String serverKey, InetAddress server) 290 throws IOException { 291 String hostname = getHostnameForServerPrincipal(conf, server); 292 return SecurityUtil.getServerPrincipal(conf.get(serverKey), hostname); 293 } 294 295 protected final boolean isKerberosAuth() { 296 return provider.getSaslAuthMethod().getCode() == AuthMethod.KERBEROS.code; 297 } 298 299 protected final Set<String> getServerPrincipals() throws IOException { 300 // for authentication method other than kerberos, we do not need to know the server principal 301 if (!isKerberosAuth()) { 302 return Collections.singleton(HConstants.EMPTY_STRING); 303 } 304 // if we have successfully authenticated last time, just return the server principal we use last 305 // time 306 if (lastSucceededServerPrincipal != null) { 307 return Collections.singleton(lastSucceededServerPrincipal); 308 } 309 InetAddress server = 310 new InetSocketAddress(remoteId.address.getHostName(), remoteId.address.getPort()) 311 .getAddress(); 312 // Even if we have multiple config key in security info, it is still possible that we configured 313 // the same principal for them, so here we use a Set 314 Set<String> serverPrincipals = new TreeSet<>(); 315 for (String serverPrincipalKey : securityInfo.getServerPrincipals()) { 316 serverPrincipals.add(getServerPrincipal(conf, serverPrincipalKey, server)); 317 } 318 return serverPrincipals; 319 } 320 321 protected final <T> T randomSelect(Collection<T> c) { 322 int select = ThreadLocalRandom.current().nextInt(c.size()); 323 int index = 0; 324 for (T t : c) { 325 if (index == select) { 326 return t; 327 } 328 index++; 329 } 330 return null; 331 } 332 333 protected final String chooseServerPrincipal(Set<String> candidates, Call securityPreambleCall) 334 throws SaslException { 335 String principal = 336 ((SecurityPreamableResponse) securityPreambleCall.response).getServerPrincipal(); 337 if (!candidates.contains(principal)) { 338 // this means the server returns principal which is not in our candidates, it could be a 339 // malicious server, stop connecting 340 throw new SaslException(remoteId.address + " tells us to use server principal " + principal 341 + " which is not expected, should be one of " + candidates); 342 } 343 return principal; 344 } 345 346 protected final void saslNegotiationDone(String serverPrincipal, boolean succeed) { 347 LOG.debug("sasl negotiation done with serverPrincipal = {}, succeed = {}", serverPrincipal, 348 succeed); 349 if (succeed) { 350 this.lastSucceededServerPrincipal = serverPrincipal; 351 } else { 352 // clear the recorded principal if authentication failed 353 this.lastSucceededServerPrincipal = null; 354 } 355 } 356 357 protected abstract void callTimeout(Call call); 358 359 public ConnectionId remoteId() { 360 return remoteId; 361 } 362 363 public long getLastTouched() { 364 return lastTouched; 365 } 366 367 public void setLastTouched(long lastTouched) { 368 this.lastTouched = lastTouched; 369 } 370 371 /** 372 * Tell the idle connection sweeper whether we could be swept. 373 */ 374 public abstract boolean isActive(); 375 376 /** 377 * Just close connection. Do not need to remove from connection pool. 378 */ 379 public abstract void shutdown(); 380 381 public abstract void sendRequest(Call call, HBaseRpcController hrc) throws IOException; 382 383 /** 384 * Does the clean up work after the connection is removed from the connection pool 385 */ 386 public abstract void cleanupConnection(); 387 388 protected final Call createSecurityPreambleCall(RpcCallback<Call> callback) { 389 return new Call(-1, null, null, null, SecurityPreamableResponse.getDefaultInstance(), 0, 0, 390 Collections.emptyMap(), callback, MetricsConnection.newCallStats()); 391 } 392 393 private <T extends InputStream & DataInput> void finishCall(ResponseHeader responseHeader, T in, 394 Call call) throws IOException { 395 Message value; 396 if (call.responseDefaultType != null) { 397 Message.Builder builder = call.responseDefaultType.newBuilderForType(); 398 if (!builder.mergeDelimitedFrom(in)) { 399 // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF 400 // before reading any bytes out, so here we need to manually finish create the EOFException 401 // and finish the call 402 call.setException(new EOFException("EOF while reading response with type: " 403 + call.responseDefaultType.getClass().getName())); 404 return; 405 } 406 value = builder.build(); 407 } else { 408 value = null; 409 } 410 ExtendedCellScanner cellBlockScanner; 411 if (responseHeader.hasCellBlockMeta()) { 412 int size = responseHeader.getCellBlockMeta().getLength(); 413 // Maybe we could read directly from the ByteBuf. 414 // The problem here is that we do not know when to release it. 415 byte[] cellBlock = new byte[size]; 416 in.readFully(cellBlock); 417 cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); 418 } else { 419 cellBlockScanner = null; 420 } 421 call.setResponse(value, cellBlockScanner); 422 } 423 424 <T extends InputStream & DataInput> void readResponse(T in, Map<Integer, Call> id2Call, 425 Call preambleCall, Consumer<RemoteException> fatalConnectionErrorConsumer) throws IOException { 426 int totalSize = in.readInt(); 427 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); 428 int id = responseHeader.getCallId(); 429 if (LOG.isTraceEnabled()) { 430 LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader) 431 + ", totalSize: " + totalSize + " bytes"); 432 } 433 RemoteException remoteExc; 434 if (responseHeader.hasException()) { 435 ExceptionResponse exceptionResponse = responseHeader.getException(); 436 remoteExc = IPCUtil.createRemoteException(exceptionResponse); 437 if (IPCUtil.isFatalConnectionException(exceptionResponse)) { 438 // Here we will cleanup all calls so do not need to fall back, just return. 439 fatalConnectionErrorConsumer.accept(remoteExc); 440 if (preambleCall != null) { 441 preambleCall.setException(remoteExc); 442 } 443 return; 444 } 445 } else { 446 remoteExc = null; 447 } 448 if (id < 0) { 449 LOG.debug("process preamble call response with response type {}", 450 preambleCall != null 451 ? preambleCall.responseDefaultType.getDescriptorForType().getName() 452 : "null"); 453 if (preambleCall == null) { 454 // fall through so later we will skip this response 455 LOG.warn("Got a negative call id {} but there is no preamble call", id); 456 } else { 457 if (remoteExc != null) { 458 preambleCall.setException(remoteExc); 459 } else { 460 finishCall(responseHeader, in, preambleCall); 461 } 462 return; 463 } 464 } 465 Call call = id2Call.remove(id); 466 if (call == null) { 467 // So we got a response for which we have no corresponding 'call' here on the client-side. 468 // We probably timed out waiting, cleaned up all references, and now the server decides 469 // to return a response. There is nothing we can do w/ the response at this stage. Clean 470 // out the wire of the response so its out of the way and we can get other responses on 471 // this connection. 472 if (LOG.isDebugEnabled()) { 473 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); 474 int whatIsLeftToRead = totalSize - readSoFar; 475 LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead 476 + " bytes"); 477 } 478 return; 479 } 480 call.callStats.setResponseSizeBytes(totalSize); 481 if (remoteExc != null) { 482 call.setException(remoteExc); 483 return; 484 } 485 try { 486 finishCall(responseHeader, in, call); 487 } catch (IOException e) { 488 // As the call has been removed from id2Call map, if we hit an exception here, the 489 // exceptionCaught method can not help us finish the call, so here we need to catch the 490 // exception and finish it 491 call.setException(e); 492 // throw the exception out, the upper layer should determine whether this is a critical 493 // problem 494 throw e; 495 } 496 } 497}