001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER; 021 022import io.opentelemetry.api.GlobalOpenTelemetry; 023import io.opentelemetry.api.trace.Span; 024import io.opentelemetry.context.Context; 025import io.opentelemetry.context.Scope; 026import io.opentelemetry.context.propagation.TextMapGetter; 027import java.io.Closeable; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.net.InetAddress; 031import java.net.InetSocketAddress; 032import java.nio.ByteBuffer; 033import java.security.GeneralSecurityException; 034import java.security.cert.X509Certificate; 035import java.util.Collections; 036import java.util.Map; 037import java.util.Objects; 038import java.util.Properties; 039import org.apache.commons.crypto.cipher.CryptoCipherFactory; 040import org.apache.commons.crypto.random.CryptoRandom; 041import org.apache.commons.crypto.random.CryptoRandomFactory; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.ExtendedCellScanner; 044import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint; 045import org.apache.hadoop.hbase.client.VersionInfoUtil; 046import org.apache.hadoop.hbase.codec.Codec; 047import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 048import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 049import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 050import org.apache.hadoop.hbase.nio.ByteBuff; 051import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 052import org.apache.hadoop.hbase.security.AccessDeniedException; 053import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 054import org.apache.hadoop.hbase.security.SaslStatus; 055import org.apache.hadoop.hbase.security.SaslUtil; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider; 058import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider; 059import org.apache.hadoop.hbase.trace.TraceUtil; 060import org.apache.hadoop.hbase.util.ByteBufferUtils; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.Pair; 063import org.apache.hadoop.io.IntWritable; 064import org.apache.hadoop.io.Writable; 065import org.apache.hadoop.io.WritableUtils; 066import org.apache.hadoop.io.compress.CompressionCodec; 067import org.apache.hadoop.security.UserGroupInformation; 068import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 069import org.apache.hadoop.security.authorize.AuthorizationException; 070import org.apache.hadoop.security.authorize.ProxyUsers; 071import org.apache.yetus.audience.InterfaceAudience; 072 073import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 074import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 075import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; 076import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 077import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 078import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 079import org.apache.hbase.thirdparty.com.google.protobuf.Message; 080import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 081import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 082 083import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; 094 095/** Reads calls from a connection and queues them for handling. */ 096@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 097 justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 098@InterfaceAudience.Private 099abstract class ServerRpcConnection implements Closeable { 100 101 private static final TextMapGetter<RPCTInfo> getter = new RPCTInfoGetter(); 102 103 protected final RpcServer rpcServer; 104 // If the connection header has been read or not. 105 protected boolean connectionHeaderRead = false; 106 107 protected CallCleanup callCleanup; 108 109 // Cache the remote host & port info so that even if the socket is 110 // disconnected, we can say where it used to connect to. 111 protected String hostAddress; 112 protected int remotePort; 113 protected InetAddress addr; 114 protected ConnectionHeader connectionHeader; 115 protected Map<String, byte[]> connectionAttributes; 116 117 /** 118 * Codec the client asked use. 119 */ 120 protected Codec codec; 121 /** 122 * Compression codec the client asked us use. 123 */ 124 protected CompressionCodec compressionCodec; 125 protected BlockingService service; 126 127 protected SaslServerAuthenticationProvider provider; 128 protected boolean skipInitialSaslHandshake; 129 protected boolean useSasl; 130 protected HBaseSaslRpcServer saslServer; 131 132 // was authentication allowed with a fallback to simple auth 133 protected boolean authenticatedWithFallback; 134 135 protected boolean retryImmediatelySupported = false; 136 137 protected User user = null; 138 protected UserGroupInformation ugi = null; 139 protected X509Certificate[] clientCertificateChain = null; 140 141 public ServerRpcConnection(RpcServer rpcServer) { 142 this.rpcServer = rpcServer; 143 this.callCleanup = null; 144 } 145 146 @Override 147 public String toString() { 148 return getHostAddress() + ":" + remotePort; 149 } 150 151 public String getHostAddress() { 152 return hostAddress; 153 } 154 155 public InetAddress getHostInetAddress() { 156 return addr; 157 } 158 159 public int getRemotePort() { 160 return remotePort; 161 } 162 163 public VersionInfo getVersionInfo() { 164 if (connectionHeader != null && connectionHeader.hasVersionInfo()) { 165 return connectionHeader.getVersionInfo(); 166 } 167 return null; 168 } 169 170 private String getFatalConnectionString(final int version, final byte authByte) { 171 return "serverVersion=" + RpcServer.CURRENT_VERSION + ", clientVersion=" + version 172 + ", authMethod=" + authByte + 173 // The provider may be null if we failed to parse the header of the request 174 ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName()) 175 + " from " + toString(); 176 } 177 178 /** 179 * Set up cell block codecs 180 */ 181 private void setupCellBlockCodecs() throws FatalConnectionException { 182 // TODO: Plug in other supported decoders. 183 if (!connectionHeader.hasCellBlockCodecClass()) { 184 return; 185 } 186 String className = connectionHeader.getCellBlockCodecClass(); 187 if (className == null || className.length() == 0) { 188 return; 189 } 190 try { 191 this.codec = (Codec) Class.forName(className).getDeclaredConstructor().newInstance(); 192 } catch (Exception e) { 193 throw new UnsupportedCellCodecException(className, e); 194 } 195 if (!connectionHeader.hasCellBlockCompressorClass()) { 196 return; 197 } 198 className = connectionHeader.getCellBlockCompressorClass(); 199 try { 200 this.compressionCodec = 201 (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance(); 202 } catch (Exception e) { 203 throw new UnsupportedCompressionCodecException(className, e); 204 } 205 } 206 207 /** 208 * Set up cipher for rpc encryption with Apache Commons Crypto. 209 */ 210 private Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> setupCryptoCipher() 211 throws FatalConnectionException { 212 // If simple auth, return 213 if (saslServer == null) { 214 return null; 215 } 216 // check if rpc encryption with Crypto AES 217 String qop = saslServer.getNegotiatedQop(); 218 boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop); 219 boolean isCryptoAesEncryption = isEncryption 220 && this.rpcServer.conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false); 221 if (!isCryptoAesEncryption) { 222 return null; 223 } 224 if (!connectionHeader.hasRpcCryptoCipherTransformation()) { 225 return null; 226 } 227 String transformation = connectionHeader.getRpcCryptoCipherTransformation(); 228 if (transformation == null || transformation.length() == 0) { 229 return null; 230 } 231 // Negotiates AES based on complete saslServer. 232 // The Crypto metadata need to be encrypted and send to client. 233 Properties properties = new Properties(); 234 // the property for SecureRandomFactory 235 properties.setProperty(CryptoRandomFactory.CLASSES_KEY, 236 this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random", 237 "org.apache.commons.crypto.random.JavaCryptoRandom")); 238 // the property for cipher class 239 properties.setProperty(CryptoCipherFactory.CLASSES_KEY, 240 this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class", 241 "org.apache.commons.crypto.cipher.JceCipher")); 242 243 int cipherKeyBits = 244 this.rpcServer.conf.getInt("hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128); 245 // generate key and iv 246 if (cipherKeyBits % 8 != 0) { 247 throw new IllegalArgumentException( 248 "The AES cipher key size in bits" + " should be a multiple of byte"); 249 } 250 int len = cipherKeyBits / 8; 251 byte[] inKey = new byte[len]; 252 byte[] outKey = new byte[len]; 253 byte[] inIv = new byte[len]; 254 byte[] outIv = new byte[len]; 255 256 CryptoAES cryptoAES; 257 try { 258 // generate the cipher meta data with SecureRandom 259 CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); 260 secureRandom.nextBytes(inKey); 261 secureRandom.nextBytes(outKey); 262 secureRandom.nextBytes(inIv); 263 secureRandom.nextBytes(outIv); 264 265 // create CryptoAES for server 266 cryptoAES = new CryptoAES(transformation, properties, inKey, outKey, inIv, outIv); 267 } catch (GeneralSecurityException | IOException ex) { 268 throw new UnsupportedCryptoException(ex.getMessage(), ex); 269 } 270 // create SaslCipherMeta and send to client, 271 // for client, the [inKey, outKey], [inIv, outIv] should be reversed 272 RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder(); 273 ccmBuilder.setTransformation(transformation); 274 ccmBuilder.setInIv(getByteString(outIv)); 275 ccmBuilder.setInKey(getByteString(outKey)); 276 ccmBuilder.setOutIv(getByteString(inIv)); 277 ccmBuilder.setOutKey(getByteString(inKey)); 278 RPCProtos.ConnectionHeaderResponse resp = 279 RPCProtos.ConnectionHeaderResponse.newBuilder().setCryptoCipherMeta(ccmBuilder).build(); 280 return Pair.newPair(resp, cryptoAES); 281 } 282 283 private ByteString getByteString(byte[] bytes) { 284 // return singleton to reduce object allocation 285 return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); 286 } 287 288 private UserGroupInformation createUser(ConnectionHeader head) { 289 UserGroupInformation ugi = null; 290 291 if (!head.hasUserInfo()) { 292 return null; 293 } 294 UserInformation userInfoProto = head.getUserInfo(); 295 String effectiveUser = null; 296 if (userInfoProto.hasEffectiveUser()) { 297 effectiveUser = userInfoProto.getEffectiveUser(); 298 } 299 String realUser = null; 300 if (userInfoProto.hasRealUser()) { 301 realUser = userInfoProto.getRealUser(); 302 } 303 if (effectiveUser != null) { 304 if (realUser != null) { 305 UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser); 306 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi); 307 } else { 308 ugi = UserGroupInformation.createRemoteUser(effectiveUser); 309 } 310 } 311 return ugi; 312 } 313 314 protected final void disposeSasl() { 315 if (saslServer != null) { 316 saslServer.dispose(); 317 saslServer = null; 318 } 319 } 320 321 /** 322 * No protobuf encoding of raw sasl messages 323 */ 324 protected final void doRawSaslReply(SaslStatus status, Writable rv, String errorClass, 325 String error) throws IOException { 326 BufferChain bc; 327 // In my testing, have noticed that sasl messages are usually 328 // in the ballpark of 100-200. That's why the initial capacity is 256. 329 try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256); 330 DataOutputStream out = new DataOutputStream(saslResponse)) { 331 out.writeInt(status.state); // write status 332 if (status == SaslStatus.SUCCESS) { 333 rv.write(out); 334 } else { 335 WritableUtils.writeString(out, errorClass); 336 WritableUtils.writeString(out, error); 337 } 338 bc = new BufferChain(saslResponse.getByteBuffer()); 339 } 340 doRespond(() -> bc); 341 } 342 343 HBaseSaslRpcServer getOrCreateSaslServer() throws IOException { 344 if (saslServer == null) { 345 saslServer = new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager); 346 } 347 return saslServer; 348 } 349 350 void finishSaslNegotiation() throws IOException { 351 String negotiatedQop = saslServer.getNegotiatedQop(); 352 SaslUtil.verifyNegotiatedQop(saslServer.getRequestedQop(), negotiatedQop); 353 ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager); 354 RpcServer.LOG.debug( 355 "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi, 356 negotiatedQop); 357 rpcServer.metrics.authenticationSuccess(); 358 RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); 359 } 360 361 public void processOneRpc(ByteBuff buf) throws IOException, InterruptedException { 362 if (connectionHeaderRead) { 363 processRequest(buf); 364 } else { 365 processConnectionHeader(buf); 366 callCleanupIfNeeded(); 367 this.connectionHeaderRead = true; 368 this.rpcServer.getRpcCoprocessorHost().preAuthorizeConnection(connectionHeader, addr); 369 if (rpcServer.needAuthorization() && !authorizeConnection()) { 370 // Throw FatalConnectionException wrapping ACE so client does right thing and closes 371 // down the connection instead of trying to read non-existent retun. 372 throw new AccessDeniedException("Connection from " + this + " for service " 373 + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); 374 } 375 this.user = this.rpcServer.userProvider.create(this.ugi); 376 this.rpcServer.getRpcCoprocessorHost().postAuthorizeConnection( 377 this.user != null ? this.user.getName() : null, this.clientCertificateChain); 378 } 379 } 380 381 private boolean authorizeConnection() throws IOException { 382 try { 383 // If auth method is DIGEST, the token was obtained by the 384 // real user for the effective user, therefore not required to 385 // authorize real user. doAs is allowed only for simple or kerberos 386 // authentication 387 if (ugi != null && ugi.getRealUser() != null && provider.supportsProtocolAuthentication()) { 388 ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf); 389 } 390 this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress()); 391 this.rpcServer.metrics.authorizationSuccess(); 392 } catch (AuthorizationException ae) { 393 if (RpcServer.LOG.isDebugEnabled()) { 394 RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae); 395 } 396 this.rpcServer.metrics.authorizationFailure(); 397 doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae))); 398 return false; 399 } 400 return true; 401 } 402 403 private CodedInputStream createCis(ByteBuff buf) { 404 // Here we read in the header. We avoid having pb 405 // do its default 4k allocation for CodedInputStream. We force it to use 406 // backing array. 407 CodedInputStream cis; 408 if (buf.hasArray()) { 409 cis = UnsafeByteOperations 410 .unsafeWrap(buf.array(), buf.arrayOffset() + buf.position(), buf.limit()).newCodedInput(); 411 } else { 412 cis = UnsafeByteOperations.unsafeWrap(new ByteBuffByteInput(buf, buf.limit()), 0, buf.limit()) 413 .newCodedInput(); 414 } 415 cis.enableAliasing(true); 416 return cis; 417 } 418 419 // Reads the connection header following version 420 private void processConnectionHeader(ByteBuff buf) throws IOException { 421 this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf)); 422 423 // we want to copy the attributes prior to releasing the buffer so that they don't get corrupted 424 // eventually 425 if (connectionHeader.getAttributeList().isEmpty()) { 426 this.connectionAttributes = Collections.emptyMap(); 427 } else { 428 this.connectionAttributes = 429 Maps.newHashMapWithExpectedSize(connectionHeader.getAttributeList().size()); 430 for (HBaseProtos.NameBytesPair nameBytesPair : connectionHeader.getAttributeList()) { 431 this.connectionAttributes.put(nameBytesPair.getName(), 432 nameBytesPair.getValue().toByteArray()); 433 } 434 } 435 String serviceName = connectionHeader.getServiceName(); 436 if (serviceName == null) { 437 throw new EmptyServiceNameException(); 438 } 439 this.service = RpcServer.getService(this.rpcServer.services, serviceName); 440 if (this.service == null) { 441 throw new UnknownServiceException(serviceName); 442 } 443 setupCellBlockCodecs(); 444 sendConnectionHeaderResponseIfNeeded(); 445 UserGroupInformation protocolUser = createUser(connectionHeader); 446 if (!useSasl) { 447 ugi = protocolUser; 448 if (ugi != null) { 449 ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE); 450 } 451 // audit logging for SASL authenticated users happens in saslReadAndProcess() 452 if (authenticatedWithFallback) { 453 RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", ugi, 454 getHostAddress()); 455 } 456 } else { 457 // user is authenticated 458 ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod()); 459 // Now we check if this is a proxy user case. If the protocol user is 460 // different from the 'user', it is a proxy user scenario. However, 461 // this is not allowed if user authenticated with DIGEST. 462 if ((protocolUser != null) && (!protocolUser.getUserName().equals(ugi.getUserName()))) { 463 if (!provider.supportsProtocolAuthentication()) { 464 // Not allowed to doAs if token authentication is used 465 throw new AccessDeniedException("Authenticated user (" + ugi 466 + ") doesn't match what the client claims to be (" + protocolUser + ")"); 467 } else { 468 // Effective user can be different from authenticated user 469 // for simple auth or kerberos auth 470 // The user is the real user. Now we create a proxy user 471 UserGroupInformation realUser = ugi; 472 ugi = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser); 473 // Now the user is a proxy user, set Authentication method Proxy. 474 ugi.setAuthenticationMethod(AuthenticationMethod.PROXY); 475 } 476 } 477 } 478 String version; 479 if (this.connectionHeader.hasVersionInfo()) { 480 // see if this connection will support RetryImmediatelyException 481 this.retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2); 482 version = this.connectionHeader.getVersionInfo().getVersion(); 483 } else { 484 version = "UNKNOWN"; 485 } 486 RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}", 487 this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName); 488 } 489 490 /** 491 * Send the response for connection header 492 */ 493 private void sendConnectionHeaderResponseIfNeeded() throws FatalConnectionException { 494 Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> pair = setupCryptoCipher(); 495 // Response the connection header if Crypto AES is enabled 496 if (pair == null) { 497 return; 498 } 499 try { 500 int size = pair.getFirst().getSerializedSize(); 501 BufferChain bc; 502 try (ByteBufferOutputStream bbOut = new ByteBufferOutputStream(4 + size); 503 DataOutputStream out = new DataOutputStream(bbOut)) { 504 out.writeInt(size); 505 pair.getFirst().writeTo(out); 506 bc = new BufferChain(bbOut.getByteBuffer()); 507 } 508 doRespond(new RpcResponse() { 509 510 @Override 511 public BufferChain getResponse() { 512 return bc; 513 } 514 515 @Override 516 public void done() { 517 // must switch after sending the connection header response, as the client still uses the 518 // original SaslClient to unwrap the data we send back 519 saslServer.switchToCryptoAES(pair.getSecond()); 520 } 521 }); 522 } catch (IOException ex) { 523 throw new UnsupportedCryptoException(ex.getMessage(), ex); 524 } 525 } 526 527 protected abstract void doRespond(RpcResponse resp) throws IOException; 528 529 /** 530 * Has the request header and the request param and optionally encoded data buffer all in this one 531 * array. 532 * <p/> 533 * Will be overridden in tests. 534 */ 535 protected void processRequest(ByteBuff buf) throws IOException, InterruptedException { 536 long totalRequestSize = buf.limit(); 537 int offset = 0; 538 // Here we read in the header. We avoid having pb 539 // do its default 4k allocation for CodedInputStream. We force it to use 540 // backing array. 541 CodedInputStream cis = createCis(buf); 542 int headerSize = cis.readRawVarint32(); 543 offset = cis.getTotalBytesRead(); 544 Message.Builder builder = RequestHeader.newBuilder(); 545 ProtobufUtil.mergeFrom(builder, cis, headerSize); 546 RequestHeader header = (RequestHeader) builder.build(); 547 offset += headerSize; 548 Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() 549 .extract(Context.current(), header.getTraceInfo(), getter); 550 551 // n.b. Management of this Span instance is a little odd. Most exit paths from this try scope 552 // are early-exits due to error cases. There's only one success path, the asynchronous call to 553 // RpcScheduler#dispatch. The success path assumes ownership of the span, which is represented 554 // by null-ing out the reference in this scope. All other paths end the span. Thus, and in 555 // order to avoid accidentally orphaning the span, the call to Span#end happens in a finally 556 // block iff the span is non-null. 557 Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx); 558 try (Scope ignored = span.makeCurrent()) { 559 int id = header.getCallId(); 560 // HBASE-28128 - if server is aborting, don't bother trying to process. It will 561 // fail at the handler layer, but worse might result in CallQueueTooBigException if the 562 // queue is full but server is not properly processing requests. Better to throw an aborted 563 // exception here so that the client can properly react. 564 if (rpcServer.server != null && rpcServer.server.isAborted()) { 565 RegionServerAbortedException serverIsAborted = new RegionServerAbortedException( 566 "Server " + rpcServer.server.getServerName() + " aborting"); 567 this.rpcServer.metrics.exception(serverIsAborted); 568 sendErrorResponseForCall(id, totalRequestSize, span, serverIsAborted.getMessage(), 569 serverIsAborted); 570 return; 571 } 572 573 if (RpcServer.LOG.isTraceEnabled()) { 574 RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) 575 + " totalRequestSize: " + totalRequestSize + " bytes"); 576 } 577 // Enforcing the call queue size, this triggers a retry in the client 578 // This is a bit late to be doing this check - we have already read in the 579 // total request. 580 if ( 581 (totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) 582 > this.rpcServer.maxQueueSizeInBytes 583 ) { 584 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 585 sendErrorResponseForCall(id, totalRequestSize, span, 586 "Call queue is full on " + this.rpcServer.server.getServerName() 587 + ", is hbase.ipc.server.max.callqueue.size too small?", 588 RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 589 return; 590 } 591 MethodDescriptor md = null; 592 Message param = null; 593 ExtendedCellScanner cellScanner = null; 594 try { 595 if (header.hasRequestParam() && header.getRequestParam()) { 596 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); 597 if (md == null) { 598 throw new UnsupportedOperationException(header.getMethodName()); 599 } 600 builder = this.service.getRequestPrototype(md).newBuilderForType(); 601 cis.resetSizeCounter(); 602 int paramSize = cis.readRawVarint32(); 603 offset += cis.getTotalBytesRead(); 604 if (builder != null) { 605 ProtobufUtil.mergeFrom(builder, cis, paramSize); 606 param = builder.build(); 607 } 608 offset += paramSize; 609 } else { 610 // currently header must have request param, so we directly throw 611 // exception here 612 String msg = "Invalid request header: " + TextFormat.shortDebugString(header) 613 + ", should have param set in it"; 614 RpcServer.LOG.warn(msg); 615 throw new DoNotRetryIOException(msg); 616 } 617 if (header.hasCellBlockMeta()) { 618 buf.position(offset); 619 ByteBuff dup = buf.duplicate(); 620 dup.limit(offset + header.getCellBlockMeta().getLength()); 621 cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec, 622 this.compressionCodec, dup); 623 } 624 } catch (Throwable thrown) { 625 InetSocketAddress address = this.rpcServer.getListenerAddress(); 626 String msg = (address != null ? address : "(channel closed)") 627 + " is unable to read call parameter from client " + getHostAddress(); 628 RpcServer.LOG.warn(msg, thrown); 629 630 this.rpcServer.metrics.exception(thrown); 631 632 final Throwable responseThrowable; 633 if (thrown instanceof LinkageError) { 634 // probably the hbase hadoop version does not match the running hadoop version 635 responseThrowable = new DoNotRetryIOException(thrown); 636 } else if (thrown instanceof UnsupportedOperationException) { 637 // If the method is not present on the server, do not retry. 638 responseThrowable = new DoNotRetryIOException(thrown); 639 } else { 640 responseThrowable = thrown; 641 } 642 643 sendErrorResponseForCall(id, totalRequestSize, span, 644 msg + "; " + responseThrowable.getMessage(), responseThrowable); 645 return; 646 } 647 648 int timeout = 0; 649 if (header.hasTimeout() && header.getTimeout() > 0) { 650 timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); 651 } 652 ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, 653 totalRequestSize, this.addr, timeout, this.callCleanup); 654 655 if (this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { 656 // unset span do that it's not closed in the finally block 657 span = null; 658 } else { 659 this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); 660 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 661 call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, 662 "Call queue is full on " + this.rpcServer.server.getServerName() 663 + ", too many items queued ?"); 664 TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 665 call.sendResponseIfReady(); 666 } 667 } finally { 668 if (span != null) { 669 span.end(); 670 } 671 } 672 } 673 674 private void sendErrorResponseForCall(int id, long totalRequestSize, Span span, String msg, 675 Throwable responseThrowable) throws IOException { 676 ServerCall<?> failedcall = createCall(id, this.service, null, null, null, null, 677 totalRequestSize, null, 0, this.callCleanup); 678 failedcall.setResponse(null, null, responseThrowable, msg); 679 TraceUtil.setError(span, responseThrowable); 680 failedcall.sendResponseIfReady(); 681 } 682 683 protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException { 684 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1); 685 ServerCall.setExceptionResponse(e, msg, headerBuilder); 686 ByteBuffer headerBuf = 687 ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null); 688 BufferChain buf = new BufferChain(headerBuf); 689 return () -> buf; 690 } 691 692 private void doBadPreambleHandling(String msg) throws IOException { 693 doBadPreambleHandling(msg, new FatalConnectionException(msg)); 694 } 695 696 private void doBadPreambleHandling(String msg, Exception e) throws IOException { 697 RpcServer.LOG.warn(msg, e); 698 doRespond(getErrorResponse(msg, e)); 699 } 700 701 private void doPreambleResponse(Message resp) throws IOException { 702 ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build(); 703 ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, null); 704 BufferChain bufChain = new BufferChain(buf); 705 doRespond(() -> bufChain); 706 } 707 708 private boolean doConnectionRegistryResponse() throws IOException { 709 if (!(rpcServer.server instanceof ConnectionRegistryEndpoint)) { 710 // should be in tests or some scenarios where we should not reach here 711 return false; 712 } 713 // on backup masters, this request may be blocked since we need to fetch it from filesystem, 714 // but since it is just backup master, it is not a critical problem 715 String clusterId = ((ConnectionRegistryEndpoint) rpcServer.server).getClusterId(); 716 RpcServer.LOG.debug("Response connection registry, clusterId = '{}'", clusterId); 717 if (clusterId == null) { 718 // should be in tests or some scenarios where we should not reach here 719 return false; 720 } 721 GetConnectionRegistryResponse resp = 722 GetConnectionRegistryResponse.newBuilder().setClusterId(clusterId).build(); 723 doPreambleResponse(resp); 724 return true; 725 } 726 727 private void doSecurityPreambleResponse() throws IOException { 728 if (rpcServer.isSecurityEnabled) { 729 SecurityPreamableResponse resp = SecurityPreamableResponse.newBuilder() 730 .setServerPrincipal(rpcServer.serverPrincipal).build(); 731 doPreambleResponse(resp); 732 } else { 733 // security is not enabled, do not need a principal when connecting, throw a special exception 734 // to let client know it should just use simple authentication 735 doRespond(getErrorResponse("security is not enabled", new SecurityNotEnabledException())); 736 } 737 } 738 739 protected final void callCleanupIfNeeded() { 740 if (callCleanup != null) { 741 callCleanup.run(); 742 callCleanup = null; 743 } 744 } 745 746 protected enum PreambleResponse { 747 SUCCEED, // successfully processed the rpc preamble header 748 CONTINUE, // the preamble header is for other purpose, wait for the rpc preamble header 749 CLOSE // close the rpc connection 750 } 751 752 protected final PreambleResponse processPreamble(ByteBuffer preambleBuffer) throws IOException { 753 assert preambleBuffer.remaining() == 6; 754 if ( 755 ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, 756 RpcClient.REGISTRY_PREAMBLE_HEADER, 0, 6) && doConnectionRegistryResponse() 757 ) { 758 return PreambleResponse.CLOSE; 759 } 760 if ( 761 ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, 762 RpcClient.SECURITY_PREAMBLE_HEADER, 0, 6) 763 ) { 764 doSecurityPreambleResponse(); 765 return PreambleResponse.CONTINUE; 766 } 767 if (!ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 4, RPC_HEADER, 0, 4)) { 768 doBadPreambleHandling( 769 "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" 770 + Bytes.toStringBinary( 771 ByteBufferUtils.toBytes(preambleBuffer, preambleBuffer.position(), RPC_HEADER.length), 772 0, RPC_HEADER.length) 773 + " from " + toString()); 774 return PreambleResponse.CLOSE; 775 } 776 int version = preambleBuffer.get(preambleBuffer.position() + 4) & 0xFF; 777 byte authByte = preambleBuffer.get(preambleBuffer.position() + 5); 778 if (version != RpcServer.CURRENT_VERSION) { 779 String msg = getFatalConnectionString(version, authByte); 780 doBadPreambleHandling(msg, new WrongVersionException(msg)); 781 return PreambleResponse.CLOSE; 782 } 783 784 this.provider = rpcServer.saslProviders.selectProvider(authByte); 785 if (this.provider == null) { 786 String msg = getFatalConnectionString(version, authByte); 787 doBadPreambleHandling(msg, new BadAuthException(msg)); 788 return PreambleResponse.CLOSE; 789 } 790 // TODO this is a wart while simple auth'n doesn't go through sasl. 791 if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) { 792 if (this.rpcServer.allowFallbackToSimpleAuth) { 793 this.rpcServer.metrics.authenticationFallback(); 794 authenticatedWithFallback = true; 795 } else { 796 AccessDeniedException ae = new AccessDeniedException("Authentication is required"); 797 doRespond(getErrorResponse(ae.getMessage(), ae)); 798 return PreambleResponse.CLOSE; 799 } 800 } 801 if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) { 802 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, 803 null); 804 provider = rpcServer.saslProviders.getSimpleProvider(); 805 // client has already sent the initial Sasl message and we 806 // should ignore it. Both client and server should fall back 807 // to simple auth from now on. 808 skipInitialSaslHandshake = true; 809 } 810 useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider); 811 return PreambleResponse.SUCCEED; 812 } 813 814 boolean isSimpleAuthentication() { 815 return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider; 816 } 817 818 public abstract boolean isConnectionOpen(); 819 820 public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md, 821 RequestHeader header, Message param, ExtendedCellScanner cellScanner, long size, 822 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup); 823 824 private static class ByteBuffByteInput extends ByteInput { 825 826 private ByteBuff buf; 827 private int length; 828 829 ByteBuffByteInput(ByteBuff buf, int length) { 830 this.buf = buf; 831 this.length = length; 832 } 833 834 @Override 835 public byte read(int offset) { 836 return this.buf.get(offset); 837 } 838 839 @Override 840 public int read(int offset, byte[] out, int outOffset, int len) { 841 this.buf.get(offset, out, outOffset, len); 842 return len; 843 } 844 845 @Override 846 public int read(int offset, ByteBuffer out) { 847 int len = out.remaining(); 848 this.buf.get(out, offset, len); 849 return len; 850 } 851 852 @Override 853 public int size() { 854 return this.length; 855 } 856 } 857}