001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER; 021 022import io.opentelemetry.api.GlobalOpenTelemetry; 023import io.opentelemetry.api.trace.Span; 024import io.opentelemetry.context.Context; 025import io.opentelemetry.context.Scope; 026import io.opentelemetry.context.propagation.TextMapGetter; 027import java.io.Closeable; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.net.InetAddress; 031import java.net.InetSocketAddress; 032import java.nio.ByteBuffer; 033import java.security.GeneralSecurityException; 034import java.security.cert.X509Certificate; 035import java.util.Collections; 036import java.util.Map; 037import java.util.Objects; 038import java.util.Properties; 039import org.apache.commons.crypto.cipher.CryptoCipherFactory; 040import org.apache.commons.crypto.random.CryptoRandom; 041import org.apache.commons.crypto.random.CryptoRandomFactory; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.ExtendedCellScanner; 044import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint; 045import org.apache.hadoop.hbase.client.VersionInfoUtil; 046import org.apache.hadoop.hbase.codec.Codec; 047import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 048import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 049import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 050import org.apache.hadoop.hbase.nio.ByteBuff; 051import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 052import org.apache.hadoop.hbase.security.AccessDeniedException; 053import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 054import org.apache.hadoop.hbase.security.SaslStatus; 055import org.apache.hadoop.hbase.security.SaslUtil; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider; 058import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders; 059import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider; 060import org.apache.hadoop.hbase.trace.TraceUtil; 061import org.apache.hadoop.hbase.util.ByteBufferUtils; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.Pair; 064import org.apache.hadoop.io.IntWritable; 065import org.apache.hadoop.io.Writable; 066import org.apache.hadoop.io.WritableUtils; 067import org.apache.hadoop.io.compress.CompressionCodec; 068import org.apache.hadoop.security.UserGroupInformation; 069import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 070import org.apache.hadoop.security.authorize.AuthorizationException; 071import org.apache.hadoop.security.authorize.ProxyUsers; 072import org.apache.yetus.audience.InterfaceAudience; 073 074import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 075import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 076import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; 077import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 078import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 079import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 080import org.apache.hbase.thirdparty.com.google.protobuf.Message; 081import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 082import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 083 084import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; 095 096/** Reads calls from a connection and queues them for handling. */ 097@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 098 justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 099@InterfaceAudience.Private 100abstract class ServerRpcConnection implements Closeable { 101 102 private static final TextMapGetter<RPCTInfo> getter = new RPCTInfoGetter(); 103 104 protected final RpcServer rpcServer; 105 // If the connection header has been read or not. 106 protected boolean connectionHeaderRead = false; 107 108 protected CallCleanup callCleanup; 109 110 // Cache the remote host & port info so that even if the socket is 111 // disconnected, we can say where it used to connect to. 112 protected String hostAddress; 113 protected int remotePort; 114 protected InetAddress addr; 115 protected ConnectionHeader connectionHeader; 116 protected Map<String, byte[]> connectionAttributes; 117 118 /** 119 * Codec the client asked use. 120 */ 121 protected Codec codec; 122 /** 123 * Compression codec the client asked us use. 124 */ 125 protected CompressionCodec compressionCodec; 126 protected BlockingService service; 127 128 protected SaslServerAuthenticationProvider provider; 129 protected boolean skipInitialSaslHandshake; 130 protected boolean useSasl; 131 protected HBaseSaslRpcServer saslServer; 132 133 // was authentication allowed with a fallback to simple auth 134 protected boolean authenticatedWithFallback; 135 136 protected boolean retryImmediatelySupported = false; 137 138 protected User user = null; 139 protected UserGroupInformation ugi = null; 140 protected SaslServerAuthenticationProviders saslProviders = null; 141 protected X509Certificate[] clientCertificateChain = null; 142 143 public ServerRpcConnection(RpcServer rpcServer) { 144 this.rpcServer = rpcServer; 145 this.callCleanup = null; 146 this.saslProviders = SaslServerAuthenticationProviders.getInstance(rpcServer.getConf()); 147 } 148 149 @Override 150 public String toString() { 151 return getHostAddress() + ":" + remotePort; 152 } 153 154 public String getHostAddress() { 155 return hostAddress; 156 } 157 158 public InetAddress getHostInetAddress() { 159 return addr; 160 } 161 162 public int getRemotePort() { 163 return remotePort; 164 } 165 166 public VersionInfo getVersionInfo() { 167 if (connectionHeader != null && connectionHeader.hasVersionInfo()) { 168 return connectionHeader.getVersionInfo(); 169 } 170 return null; 171 } 172 173 private String getFatalConnectionString(final int version, final byte authByte) { 174 return "serverVersion=" + RpcServer.CURRENT_VERSION + ", clientVersion=" + version 175 + ", authMethod=" + authByte + 176 // The provider may be null if we failed to parse the header of the request 177 ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName()) 178 + " from " + toString(); 179 } 180 181 /** 182 * Set up cell block codecs 183 */ 184 private void setupCellBlockCodecs() throws FatalConnectionException { 185 // TODO: Plug in other supported decoders. 186 if (!connectionHeader.hasCellBlockCodecClass()) { 187 return; 188 } 189 String className = connectionHeader.getCellBlockCodecClass(); 190 if (className == null || className.length() == 0) { 191 return; 192 } 193 try { 194 this.codec = (Codec) Class.forName(className).getDeclaredConstructor().newInstance(); 195 } catch (Exception e) { 196 throw new UnsupportedCellCodecException(className, e); 197 } 198 if (!connectionHeader.hasCellBlockCompressorClass()) { 199 return; 200 } 201 className = connectionHeader.getCellBlockCompressorClass(); 202 try { 203 this.compressionCodec = 204 (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance(); 205 } catch (Exception e) { 206 throw new UnsupportedCompressionCodecException(className, e); 207 } 208 } 209 210 /** 211 * Set up cipher for rpc encryption with Apache Commons Crypto. 212 */ 213 private Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> setupCryptoCipher() 214 throws FatalConnectionException { 215 // If simple auth, return 216 if (saslServer == null) { 217 return null; 218 } 219 // check if rpc encryption with Crypto AES 220 String qop = saslServer.getNegotiatedQop(); 221 boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop); 222 boolean isCryptoAesEncryption = isEncryption 223 && this.rpcServer.conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false); 224 if (!isCryptoAesEncryption) { 225 return null; 226 } 227 if (!connectionHeader.hasRpcCryptoCipherTransformation()) { 228 return null; 229 } 230 String transformation = connectionHeader.getRpcCryptoCipherTransformation(); 231 if (transformation == null || transformation.length() == 0) { 232 return null; 233 } 234 // Negotiates AES based on complete saslServer. 235 // The Crypto metadata need to be encrypted and send to client. 236 Properties properties = new Properties(); 237 // the property for SecureRandomFactory 238 properties.setProperty(CryptoRandomFactory.CLASSES_KEY, 239 this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random", 240 "org.apache.commons.crypto.random.JavaCryptoRandom")); 241 // the property for cipher class 242 properties.setProperty(CryptoCipherFactory.CLASSES_KEY, 243 this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class", 244 "org.apache.commons.crypto.cipher.JceCipher")); 245 246 int cipherKeyBits = 247 this.rpcServer.conf.getInt("hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128); 248 // generate key and iv 249 if (cipherKeyBits % 8 != 0) { 250 throw new IllegalArgumentException( 251 "The AES cipher key size in bits" + " should be a multiple of byte"); 252 } 253 int len = cipherKeyBits / 8; 254 byte[] inKey = new byte[len]; 255 byte[] outKey = new byte[len]; 256 byte[] inIv = new byte[len]; 257 byte[] outIv = new byte[len]; 258 259 CryptoAES cryptoAES; 260 try { 261 // generate the cipher meta data with SecureRandom 262 CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); 263 secureRandom.nextBytes(inKey); 264 secureRandom.nextBytes(outKey); 265 secureRandom.nextBytes(inIv); 266 secureRandom.nextBytes(outIv); 267 268 // create CryptoAES for server 269 cryptoAES = new CryptoAES(transformation, properties, inKey, outKey, inIv, outIv); 270 } catch (GeneralSecurityException | IOException ex) { 271 throw new UnsupportedCryptoException(ex.getMessage(), ex); 272 } 273 // create SaslCipherMeta and send to client, 274 // for client, the [inKey, outKey], [inIv, outIv] should be reversed 275 RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder(); 276 ccmBuilder.setTransformation(transformation); 277 ccmBuilder.setInIv(getByteString(outIv)); 278 ccmBuilder.setInKey(getByteString(outKey)); 279 ccmBuilder.setOutIv(getByteString(inIv)); 280 ccmBuilder.setOutKey(getByteString(inKey)); 281 RPCProtos.ConnectionHeaderResponse resp = 282 RPCProtos.ConnectionHeaderResponse.newBuilder().setCryptoCipherMeta(ccmBuilder).build(); 283 return Pair.newPair(resp, cryptoAES); 284 } 285 286 private ByteString getByteString(byte[] bytes) { 287 // return singleton to reduce object allocation 288 return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); 289 } 290 291 private UserGroupInformation createUser(ConnectionHeader head) { 292 UserGroupInformation ugi = null; 293 294 if (!head.hasUserInfo()) { 295 return null; 296 } 297 UserInformation userInfoProto = head.getUserInfo(); 298 String effectiveUser = null; 299 if (userInfoProto.hasEffectiveUser()) { 300 effectiveUser = userInfoProto.getEffectiveUser(); 301 } 302 String realUser = null; 303 if (userInfoProto.hasRealUser()) { 304 realUser = userInfoProto.getRealUser(); 305 } 306 if (effectiveUser != null) { 307 if (realUser != null) { 308 UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser); 309 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi); 310 } else { 311 ugi = UserGroupInformation.createRemoteUser(effectiveUser); 312 } 313 } 314 return ugi; 315 } 316 317 protected final void disposeSasl() { 318 if (saslServer != null) { 319 saslServer.dispose(); 320 saslServer = null; 321 } 322 } 323 324 /** 325 * No protobuf encoding of raw sasl messages 326 */ 327 protected final void doRawSaslReply(SaslStatus status, Writable rv, String errorClass, 328 String error) throws IOException { 329 BufferChain bc; 330 // In my testing, have noticed that sasl messages are usually 331 // in the ballpark of 100-200. That's why the initial capacity is 256. 332 try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256); 333 DataOutputStream out = new DataOutputStream(saslResponse)) { 334 out.writeInt(status.state); // write status 335 if (status == SaslStatus.SUCCESS) { 336 rv.write(out); 337 } else { 338 WritableUtils.writeString(out, errorClass); 339 WritableUtils.writeString(out, error); 340 } 341 bc = new BufferChain(saslResponse.getByteBuffer()); 342 } 343 doRespond(() -> bc); 344 } 345 346 HBaseSaslRpcServer getOrCreateSaslServer() throws IOException { 347 if (saslServer == null) { 348 saslServer = new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager); 349 } 350 return saslServer; 351 } 352 353 void finishSaslNegotiation() throws IOException { 354 String negotiatedQop = saslServer.getNegotiatedQop(); 355 SaslUtil.verifyNegotiatedQop(saslServer.getRequestedQop(), negotiatedQop); 356 ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager); 357 RpcServer.LOG.debug( 358 "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi, 359 negotiatedQop); 360 rpcServer.metrics.authenticationSuccess(); 361 RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); 362 } 363 364 public void processOneRpc(ByteBuff buf) throws IOException, InterruptedException { 365 if (connectionHeaderRead) { 366 processRequest(buf); 367 } else { 368 processConnectionHeader(buf); 369 callCleanupIfNeeded(); 370 this.connectionHeaderRead = true; 371 this.rpcServer.getRpcCoprocessorHost().preAuthorizeConnection(connectionHeader, addr); 372 if (rpcServer.needAuthorization() && !authorizeConnection()) { 373 // Throw FatalConnectionException wrapping ACE so client does right thing and closes 374 // down the connection instead of trying to read non-existent retun. 375 throw new AccessDeniedException("Connection from " + this + " for service " 376 + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); 377 } 378 this.user = this.rpcServer.userProvider.create(this.ugi); 379 this.rpcServer.getRpcCoprocessorHost().postAuthorizeConnection( 380 this.user != null ? this.user.getName() : null, this.clientCertificateChain); 381 } 382 } 383 384 private boolean authorizeConnection() throws IOException { 385 try { 386 // If auth method is DIGEST, the token was obtained by the 387 // real user for the effective user, therefore not required to 388 // authorize real user. doAs is allowed only for simple or kerberos 389 // authentication 390 if (ugi != null && ugi.getRealUser() != null && provider.supportsProtocolAuthentication()) { 391 ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf); 392 } 393 this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress()); 394 this.rpcServer.metrics.authorizationSuccess(); 395 } catch (AuthorizationException ae) { 396 if (RpcServer.LOG.isDebugEnabled()) { 397 RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae); 398 } 399 this.rpcServer.metrics.authorizationFailure(); 400 doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae))); 401 return false; 402 } 403 return true; 404 } 405 406 private CodedInputStream createCis(ByteBuff buf) { 407 // Here we read in the header. We avoid having pb 408 // do its default 4k allocation for CodedInputStream. We force it to use 409 // backing array. 410 CodedInputStream cis; 411 if (buf.hasArray()) { 412 cis = UnsafeByteOperations 413 .unsafeWrap(buf.array(), buf.arrayOffset() + buf.position(), buf.limit()).newCodedInput(); 414 } else { 415 cis = UnsafeByteOperations.unsafeWrap(new ByteBuffByteInput(buf, buf.limit()), 0, buf.limit()) 416 .newCodedInput(); 417 } 418 cis.enableAliasing(true); 419 return cis; 420 } 421 422 // Reads the connection header following version 423 private void processConnectionHeader(ByteBuff buf) throws IOException { 424 this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf)); 425 426 // we want to copy the attributes prior to releasing the buffer so that they don't get corrupted 427 // eventually 428 if (connectionHeader.getAttributeList().isEmpty()) { 429 this.connectionAttributes = Collections.emptyMap(); 430 } else { 431 this.connectionAttributes = 432 Maps.newHashMapWithExpectedSize(connectionHeader.getAttributeList().size()); 433 for (HBaseProtos.NameBytesPair nameBytesPair : connectionHeader.getAttributeList()) { 434 this.connectionAttributes.put(nameBytesPair.getName(), 435 nameBytesPair.getValue().toByteArray()); 436 } 437 } 438 String serviceName = connectionHeader.getServiceName(); 439 if (serviceName == null) { 440 throw new EmptyServiceNameException(); 441 } 442 this.service = RpcServer.getService(this.rpcServer.services, serviceName); 443 if (this.service == null) { 444 throw new UnknownServiceException(serviceName); 445 } 446 setupCellBlockCodecs(); 447 sendConnectionHeaderResponseIfNeeded(); 448 UserGroupInformation protocolUser = createUser(connectionHeader); 449 if (!useSasl) { 450 ugi = protocolUser; 451 if (ugi != null) { 452 ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE); 453 } 454 // audit logging for SASL authenticated users happens in saslReadAndProcess() 455 if (authenticatedWithFallback) { 456 RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", ugi, 457 getHostAddress()); 458 } 459 } else { 460 // user is authenticated 461 ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod()); 462 // Now we check if this is a proxy user case. If the protocol user is 463 // different from the 'user', it is a proxy user scenario. However, 464 // this is not allowed if user authenticated with DIGEST. 465 if ((protocolUser != null) && (!protocolUser.getUserName().equals(ugi.getUserName()))) { 466 if (!provider.supportsProtocolAuthentication()) { 467 // Not allowed to doAs if token authentication is used 468 throw new AccessDeniedException("Authenticated user (" + ugi 469 + ") doesn't match what the client claims to be (" + protocolUser + ")"); 470 } else { 471 // Effective user can be different from authenticated user 472 // for simple auth or kerberos auth 473 // The user is the real user. Now we create a proxy user 474 UserGroupInformation realUser = ugi; 475 ugi = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser); 476 // Now the user is a proxy user, set Authentication method Proxy. 477 ugi.setAuthenticationMethod(AuthenticationMethod.PROXY); 478 } 479 } 480 } 481 String version; 482 if (this.connectionHeader.hasVersionInfo()) { 483 // see if this connection will support RetryImmediatelyException 484 this.retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2); 485 version = this.connectionHeader.getVersionInfo().getVersion(); 486 } else { 487 version = "UNKNOWN"; 488 } 489 RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}", 490 this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName); 491 } 492 493 /** 494 * Send the response for connection header 495 */ 496 private void sendConnectionHeaderResponseIfNeeded() throws FatalConnectionException { 497 Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> pair = setupCryptoCipher(); 498 // Response the connection header if Crypto AES is enabled 499 if (pair == null) { 500 return; 501 } 502 try { 503 int size = pair.getFirst().getSerializedSize(); 504 BufferChain bc; 505 try (ByteBufferOutputStream bbOut = new ByteBufferOutputStream(4 + size); 506 DataOutputStream out = new DataOutputStream(bbOut)) { 507 out.writeInt(size); 508 pair.getFirst().writeTo(out); 509 bc = new BufferChain(bbOut.getByteBuffer()); 510 } 511 doRespond(new RpcResponse() { 512 513 @Override 514 public BufferChain getResponse() { 515 return bc; 516 } 517 518 @Override 519 public void done() { 520 // must switch after sending the connection header response, as the client still uses the 521 // original SaslClient to unwrap the data we send back 522 saslServer.switchToCryptoAES(pair.getSecond()); 523 } 524 }); 525 } catch (IOException ex) { 526 throw new UnsupportedCryptoException(ex.getMessage(), ex); 527 } 528 } 529 530 protected abstract void doRespond(RpcResponse resp) throws IOException; 531 532 /** 533 * Has the request header and the request param and optionally encoded data buffer all in this one 534 * array. 535 * <p/> 536 * Will be overridden in tests. 537 */ 538 protected void processRequest(ByteBuff buf) throws IOException, InterruptedException { 539 long totalRequestSize = buf.limit(); 540 int offset = 0; 541 // Here we read in the header. We avoid having pb 542 // do its default 4k allocation for CodedInputStream. We force it to use 543 // backing array. 544 CodedInputStream cis = createCis(buf); 545 int headerSize = cis.readRawVarint32(); 546 offset = cis.getTotalBytesRead(); 547 Message.Builder builder = RequestHeader.newBuilder(); 548 ProtobufUtil.mergeFrom(builder, cis, headerSize); 549 RequestHeader header = (RequestHeader) builder.build(); 550 offset += headerSize; 551 Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() 552 .extract(Context.current(), header.getTraceInfo(), getter); 553 554 // n.b. Management of this Span instance is a little odd. Most exit paths from this try scope 555 // are early-exits due to error cases. There's only one success path, the asynchronous call to 556 // RpcScheduler#dispatch. The success path assumes ownership of the span, which is represented 557 // by null-ing out the reference in this scope. All other paths end the span. Thus, and in 558 // order to avoid accidentally orphaning the span, the call to Span#end happens in a finally 559 // block iff the span is non-null. 560 Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx); 561 try (Scope ignored = span.makeCurrent()) { 562 int id = header.getCallId(); 563 // HBASE-28128 - if server is aborting, don't bother trying to process. It will 564 // fail at the handler layer, but worse might result in CallQueueTooBigException if the 565 // queue is full but server is not properly processing requests. Better to throw an aborted 566 // exception here so that the client can properly react. 567 if (rpcServer.server != null && rpcServer.server.isAborted()) { 568 RegionServerAbortedException serverIsAborted = new RegionServerAbortedException( 569 "Server " + rpcServer.server.getServerName() + " aborting"); 570 this.rpcServer.metrics.exception(serverIsAborted); 571 sendErrorResponseForCall(id, totalRequestSize, span, serverIsAborted.getMessage(), 572 serverIsAborted); 573 return; 574 } 575 576 if (RpcServer.LOG.isTraceEnabled()) { 577 RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) 578 + " totalRequestSize: " + totalRequestSize + " bytes"); 579 } 580 // Enforcing the call queue size, this triggers a retry in the client 581 // This is a bit late to be doing this check - we have already read in the 582 // total request. 583 if ( 584 (totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) 585 > this.rpcServer.maxQueueSizeInBytes 586 ) { 587 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 588 sendErrorResponseForCall(id, totalRequestSize, span, 589 "Call queue is full on " + this.rpcServer.server.getServerName() 590 + ", is hbase.ipc.server.max.callqueue.size too small?", 591 RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 592 return; 593 } 594 MethodDescriptor md = null; 595 Message param = null; 596 ExtendedCellScanner cellScanner = null; 597 try { 598 if (header.hasRequestParam() && header.getRequestParam()) { 599 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); 600 if (md == null) { 601 throw new UnsupportedOperationException(header.getMethodName()); 602 } 603 builder = this.service.getRequestPrototype(md).newBuilderForType(); 604 cis.resetSizeCounter(); 605 int paramSize = cis.readRawVarint32(); 606 offset += cis.getTotalBytesRead(); 607 if (builder != null) { 608 ProtobufUtil.mergeFrom(builder, cis, paramSize); 609 param = builder.build(); 610 } 611 offset += paramSize; 612 } else { 613 // currently header must have request param, so we directly throw 614 // exception here 615 String msg = "Invalid request header: " + TextFormat.shortDebugString(header) 616 + ", should have param set in it"; 617 RpcServer.LOG.warn(msg); 618 throw new DoNotRetryIOException(msg); 619 } 620 if (header.hasCellBlockMeta()) { 621 buf.position(offset); 622 ByteBuff dup = buf.duplicate(); 623 dup.limit(offset + header.getCellBlockMeta().getLength()); 624 cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec, 625 this.compressionCodec, dup); 626 } 627 } catch (Throwable thrown) { 628 InetSocketAddress address = this.rpcServer.getListenerAddress(); 629 String msg = (address != null ? address : "(channel closed)") 630 + " is unable to read call parameter from client " + getHostAddress(); 631 RpcServer.LOG.warn(msg, thrown); 632 633 this.rpcServer.metrics.exception(thrown); 634 635 final Throwable responseThrowable; 636 if (thrown instanceof LinkageError) { 637 // probably the hbase hadoop version does not match the running hadoop version 638 responseThrowable = new DoNotRetryIOException(thrown); 639 } else if (thrown instanceof UnsupportedOperationException) { 640 // If the method is not present on the server, do not retry. 641 responseThrowable = new DoNotRetryIOException(thrown); 642 } else { 643 responseThrowable = thrown; 644 } 645 646 sendErrorResponseForCall(id, totalRequestSize, span, 647 msg + "; " + responseThrowable.getMessage(), responseThrowable); 648 return; 649 } 650 651 int timeout = 0; 652 if (header.hasTimeout() && header.getTimeout() > 0) { 653 timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); 654 } 655 ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, 656 totalRequestSize, this.addr, timeout, this.callCleanup); 657 658 if (this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { 659 // unset span do that it's not closed in the finally block 660 span = null; 661 } else { 662 this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); 663 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 664 call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, 665 "Call queue is full on " + this.rpcServer.server.getServerName() 666 + ", too many items queued ?"); 667 TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 668 call.sendResponseIfReady(); 669 } 670 } finally { 671 if (span != null) { 672 span.end(); 673 } 674 } 675 } 676 677 private void sendErrorResponseForCall(int id, long totalRequestSize, Span span, String msg, 678 Throwable responseThrowable) throws IOException { 679 ServerCall<?> failedcall = createCall(id, this.service, null, null, null, null, 680 totalRequestSize, null, 0, this.callCleanup); 681 failedcall.setResponse(null, null, responseThrowable, msg); 682 TraceUtil.setError(span, responseThrowable); 683 failedcall.sendResponseIfReady(); 684 } 685 686 protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException { 687 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1); 688 ServerCall.setExceptionResponse(e, msg, headerBuilder); 689 ByteBuffer headerBuf = 690 ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null); 691 BufferChain buf = new BufferChain(headerBuf); 692 return () -> buf; 693 } 694 695 private void doBadPreambleHandling(String msg) throws IOException { 696 doBadPreambleHandling(msg, new FatalConnectionException(msg)); 697 } 698 699 private void doBadPreambleHandling(String msg, Exception e) throws IOException { 700 RpcServer.LOG.warn(msg, e); 701 doRespond(getErrorResponse(msg, e)); 702 } 703 704 private void doPreambleResponse(Message resp) throws IOException { 705 ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build(); 706 ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, null); 707 BufferChain bufChain = new BufferChain(buf); 708 doRespond(() -> bufChain); 709 } 710 711 private boolean doConnectionRegistryResponse() throws IOException { 712 if (!(rpcServer.server instanceof ConnectionRegistryEndpoint)) { 713 // should be in tests or some scenarios where we should not reach here 714 return false; 715 } 716 // on backup masters, this request may be blocked since we need to fetch it from filesystem, 717 // but since it is just backup master, it is not a critical problem 718 String clusterId = ((ConnectionRegistryEndpoint) rpcServer.server).getClusterId(); 719 RpcServer.LOG.debug("Response connection registry, clusterId = '{}'", clusterId); 720 if (clusterId == null) { 721 // should be in tests or some scenarios where we should not reach here 722 return false; 723 } 724 GetConnectionRegistryResponse resp = 725 GetConnectionRegistryResponse.newBuilder().setClusterId(clusterId).build(); 726 doPreambleResponse(resp); 727 return true; 728 } 729 730 private void doSecurityPreambleResponse() throws IOException { 731 if (rpcServer.isSecurityEnabled) { 732 SecurityPreamableResponse resp = SecurityPreamableResponse.newBuilder() 733 .setServerPrincipal(rpcServer.serverPrincipal).build(); 734 doPreambleResponse(resp); 735 } else { 736 // security is not enabled, do not need a principal when connecting, throw a special exception 737 // to let client know it should just use simple authentication 738 doRespond(getErrorResponse("security is not enabled", new SecurityNotEnabledException())); 739 } 740 } 741 742 protected final void callCleanupIfNeeded() { 743 if (callCleanup != null) { 744 callCleanup.run(); 745 callCleanup = null; 746 } 747 } 748 749 protected enum PreambleResponse { 750 SUCCEED, // successfully processed the rpc preamble header 751 CONTINUE, // the preamble header is for other purpose, wait for the rpc preamble header 752 CLOSE // close the rpc connection 753 } 754 755 protected final PreambleResponse processPreamble(ByteBuffer preambleBuffer) throws IOException { 756 assert preambleBuffer.remaining() == 6; 757 if ( 758 ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, 759 RpcClient.REGISTRY_PREAMBLE_HEADER, 0, 6) && doConnectionRegistryResponse() 760 ) { 761 return PreambleResponse.CLOSE; 762 } 763 if ( 764 ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, 765 RpcClient.SECURITY_PREAMBLE_HEADER, 0, 6) 766 ) { 767 doSecurityPreambleResponse(); 768 return PreambleResponse.CONTINUE; 769 } 770 if (!ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 4, RPC_HEADER, 0, 4)) { 771 doBadPreambleHandling( 772 "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" 773 + Bytes.toStringBinary( 774 ByteBufferUtils.toBytes(preambleBuffer, preambleBuffer.position(), RPC_HEADER.length), 775 0, RPC_HEADER.length) 776 + " from " + toString()); 777 return PreambleResponse.CLOSE; 778 } 779 int version = preambleBuffer.get(preambleBuffer.position() + 4) & 0xFF; 780 byte authByte = preambleBuffer.get(preambleBuffer.position() + 5); 781 if (version != RpcServer.CURRENT_VERSION) { 782 String msg = getFatalConnectionString(version, authByte); 783 doBadPreambleHandling(msg, new WrongVersionException(msg)); 784 return PreambleResponse.CLOSE; 785 } 786 787 this.provider = this.saslProviders.selectProvider(authByte); 788 if (this.provider == null) { 789 String msg = getFatalConnectionString(version, authByte); 790 doBadPreambleHandling(msg, new BadAuthException(msg)); 791 return PreambleResponse.CLOSE; 792 } 793 // TODO this is a wart while simple auth'n doesn't go through sasl. 794 if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) { 795 if (this.rpcServer.allowFallbackToSimpleAuth) { 796 this.rpcServer.metrics.authenticationFallback(); 797 authenticatedWithFallback = true; 798 } else { 799 AccessDeniedException ae = new AccessDeniedException("Authentication is required"); 800 doRespond(getErrorResponse(ae.getMessage(), ae)); 801 return PreambleResponse.CLOSE; 802 } 803 } 804 if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) { 805 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, 806 null); 807 provider = saslProviders.getSimpleProvider(); 808 // client has already sent the initial Sasl message and we 809 // should ignore it. Both client and server should fall back 810 // to simple auth from now on. 811 skipInitialSaslHandshake = true; 812 } 813 useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider); 814 return PreambleResponse.SUCCEED; 815 } 816 817 boolean isSimpleAuthentication() { 818 return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider; 819 } 820 821 public abstract boolean isConnectionOpen(); 822 823 public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md, 824 RequestHeader header, Message param, ExtendedCellScanner cellScanner, long size, 825 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup); 826 827 private static class ByteBuffByteInput extends ByteInput { 828 829 private ByteBuff buf; 830 private int length; 831 832 ByteBuffByteInput(ByteBuff buf, int length) { 833 this.buf = buf; 834 this.length = length; 835 } 836 837 @Override 838 public byte read(int offset) { 839 return this.buf.get(offset); 840 } 841 842 @Override 843 public int read(int offset, byte[] out, int outOffset, int len) { 844 this.buf.get(offset, out, outOffset, len); 845 return len; 846 } 847 848 @Override 849 public int read(int offset, ByteBuffer out) { 850 int len = out.remaining(); 851 this.buf.get(out, offset, len); 852 return len; 853 } 854 855 @Override 856 public int size() { 857 return this.length; 858 } 859 } 860}