001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER; 021 022import io.opentelemetry.api.GlobalOpenTelemetry; 023import io.opentelemetry.api.trace.Span; 024import io.opentelemetry.context.Context; 025import io.opentelemetry.context.Scope; 026import io.opentelemetry.context.propagation.TextMapGetter; 027import java.io.Closeable; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.net.InetAddress; 031import java.net.InetSocketAddress; 032import java.nio.ByteBuffer; 033import java.security.GeneralSecurityException; 034import java.security.cert.X509Certificate; 035import java.util.Collections; 036import java.util.Map; 037import java.util.Objects; 038import java.util.Properties; 039import org.apache.commons.crypto.cipher.CryptoCipherFactory; 040import org.apache.commons.crypto.random.CryptoRandom; 041import org.apache.commons.crypto.random.CryptoRandomFactory; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.ExtendedCellScanner; 044import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint; 045import org.apache.hadoop.hbase.client.VersionInfoUtil; 046import org.apache.hadoop.hbase.codec.Codec; 047import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 048import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 049import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 050import org.apache.hadoop.hbase.nio.ByteBuff; 051import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 052import org.apache.hadoop.hbase.security.AccessDeniedException; 053import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 054import org.apache.hadoop.hbase.security.SaslStatus; 055import org.apache.hadoop.hbase.security.SaslUtil; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider; 058import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders; 059import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider; 060import org.apache.hadoop.hbase.trace.TraceUtil; 061import org.apache.hadoop.hbase.util.ByteBufferUtils; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.Pair; 064import org.apache.hadoop.io.IntWritable; 065import org.apache.hadoop.io.Writable; 066import org.apache.hadoop.io.WritableUtils; 067import org.apache.hadoop.io.compress.CompressionCodec; 068import org.apache.hadoop.security.UserGroupInformation; 069import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 070import org.apache.hadoop.security.authorize.AuthorizationException; 071import org.apache.hadoop.security.authorize.ProxyUsers; 072import org.apache.yetus.audience.InterfaceAudience; 073 074import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 075import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 076import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; 077import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 078import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 079import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 080import org.apache.hbase.thirdparty.com.google.protobuf.Message; 081import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 082import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 083 084import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; 095 096/** Reads calls from a connection and queues them for handling. */ 097@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 098 justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 099@InterfaceAudience.Private 100abstract class ServerRpcConnection implements Closeable { 101 102 private static final TextMapGetter<RPCTInfo> getter = new RPCTInfoGetter(); 103 104 protected final RpcServer rpcServer; 105 // If the connection header has been read or not. 106 protected boolean connectionHeaderRead = false; 107 108 protected CallCleanup callCleanup; 109 110 // Cache the remote host & port info so that even if the socket is 111 // disconnected, we can say where it used to connect to. 112 protected String hostAddress; 113 protected int remotePort; 114 protected InetAddress addr; 115 protected ConnectionHeader connectionHeader; 116 protected Map<String, byte[]> connectionAttributes; 117 118 /** 119 * Codec the client asked use. 120 */ 121 protected Codec codec; 122 /** 123 * Compression codec the client asked us use. 124 */ 125 protected CompressionCodec compressionCodec; 126 protected BlockingService service; 127 128 protected SaslServerAuthenticationProvider provider; 129 protected boolean skipInitialSaslHandshake; 130 protected boolean useSasl; 131 protected HBaseSaslRpcServer saslServer; 132 133 // was authentication allowed with a fallback to simple auth 134 protected boolean authenticatedWithFallback; 135 136 protected boolean retryImmediatelySupported = false; 137 138 protected User user = null; 139 protected UserGroupInformation ugi = null; 140 protected SaslServerAuthenticationProviders saslProviders = null; 141 protected X509Certificate[] clientCertificateChain = null; 142 143 public ServerRpcConnection(RpcServer rpcServer) { 144 this.rpcServer = rpcServer; 145 this.callCleanup = null; 146 this.saslProviders = SaslServerAuthenticationProviders.getInstance(rpcServer.getConf()); 147 } 148 149 @Override 150 public String toString() { 151 return getHostAddress() + ":" + remotePort; 152 } 153 154 public String getHostAddress() { 155 return hostAddress; 156 } 157 158 public InetAddress getHostInetAddress() { 159 return addr; 160 } 161 162 public int getRemotePort() { 163 return remotePort; 164 } 165 166 public VersionInfo getVersionInfo() { 167 if (connectionHeader != null && connectionHeader.hasVersionInfo()) { 168 return connectionHeader.getVersionInfo(); 169 } 170 return null; 171 } 172 173 private String getFatalConnectionString(final int version, final byte authByte) { 174 return "serverVersion=" + RpcServer.CURRENT_VERSION + ", clientVersion=" + version 175 + ", authMethod=" + authByte + 176 // The provider may be null if we failed to parse the header of the request 177 ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName()) 178 + " from " + toString(); 179 } 180 181 /** 182 * Set up cell block codecs 183 */ 184 private void setupCellBlockCodecs() throws FatalConnectionException { 185 // TODO: Plug in other supported decoders. 186 if (!connectionHeader.hasCellBlockCodecClass()) { 187 return; 188 } 189 String className = connectionHeader.getCellBlockCodecClass(); 190 if (className == null || className.length() == 0) { 191 return; 192 } 193 try { 194 this.codec = (Codec) Class.forName(className).getDeclaredConstructor().newInstance(); 195 } catch (Exception e) { 196 throw new UnsupportedCellCodecException(className, e); 197 } 198 if (!connectionHeader.hasCellBlockCompressorClass()) { 199 return; 200 } 201 className = connectionHeader.getCellBlockCompressorClass(); 202 try { 203 this.compressionCodec = 204 (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance(); 205 } catch (Exception e) { 206 throw new UnsupportedCompressionCodecException(className, e); 207 } 208 } 209 210 /** 211 * Set up cipher for rpc encryption with Apache Commons Crypto. 212 */ 213 private Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> setupCryptoCipher() 214 throws FatalConnectionException { 215 // If simple auth, return 216 if (saslServer == null) { 217 return null; 218 } 219 // check if rpc encryption with Crypto AES 220 String qop = saslServer.getNegotiatedQop(); 221 boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop); 222 boolean isCryptoAesEncryption = isEncryption 223 && this.rpcServer.conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false); 224 if (!isCryptoAesEncryption) { 225 return null; 226 } 227 if (!connectionHeader.hasRpcCryptoCipherTransformation()) { 228 return null; 229 } 230 String transformation = connectionHeader.getRpcCryptoCipherTransformation(); 231 if (transformation == null || transformation.length() == 0) { 232 return null; 233 } 234 // Negotiates AES based on complete saslServer. 235 // The Crypto metadata need to be encrypted and send to client. 236 Properties properties = new Properties(); 237 // the property for SecureRandomFactory 238 properties.setProperty(CryptoRandomFactory.CLASSES_KEY, 239 this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random", 240 "org.apache.commons.crypto.random.JavaCryptoRandom")); 241 // the property for cipher class 242 properties.setProperty(CryptoCipherFactory.CLASSES_KEY, 243 this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class", 244 "org.apache.commons.crypto.cipher.JceCipher")); 245 246 int cipherKeyBits = 247 this.rpcServer.conf.getInt("hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128); 248 // generate key and iv 249 if (cipherKeyBits % 8 != 0) { 250 throw new IllegalArgumentException( 251 "The AES cipher key size in bits" + " should be a multiple of byte"); 252 } 253 int len = cipherKeyBits / 8; 254 byte[] inKey = new byte[len]; 255 byte[] outKey = new byte[len]; 256 byte[] inIv = new byte[len]; 257 byte[] outIv = new byte[len]; 258 259 CryptoAES cryptoAES; 260 try { 261 // generate the cipher meta data with SecureRandom 262 CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); 263 secureRandom.nextBytes(inKey); 264 secureRandom.nextBytes(outKey); 265 secureRandom.nextBytes(inIv); 266 secureRandom.nextBytes(outIv); 267 268 // create CryptoAES for server 269 cryptoAES = new CryptoAES(transformation, properties, inKey, outKey, inIv, outIv); 270 } catch (GeneralSecurityException | IOException ex) { 271 throw new UnsupportedCryptoException(ex.getMessage(), ex); 272 } 273 // create SaslCipherMeta and send to client, 274 // for client, the [inKey, outKey], [inIv, outIv] should be reversed 275 RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder(); 276 ccmBuilder.setTransformation(transformation); 277 ccmBuilder.setInIv(getByteString(outIv)); 278 ccmBuilder.setInKey(getByteString(outKey)); 279 ccmBuilder.setOutIv(getByteString(inIv)); 280 ccmBuilder.setOutKey(getByteString(inKey)); 281 RPCProtos.ConnectionHeaderResponse resp = 282 RPCProtos.ConnectionHeaderResponse.newBuilder().setCryptoCipherMeta(ccmBuilder).build(); 283 return Pair.newPair(resp, cryptoAES); 284 } 285 286 private ByteString getByteString(byte[] bytes) { 287 // return singleton to reduce object allocation 288 return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); 289 } 290 291 private UserGroupInformation createUser(ConnectionHeader head) { 292 UserGroupInformation ugi = null; 293 294 if (!head.hasUserInfo()) { 295 return null; 296 } 297 UserInformation userInfoProto = head.getUserInfo(); 298 String effectiveUser = null; 299 if (userInfoProto.hasEffectiveUser()) { 300 effectiveUser = userInfoProto.getEffectiveUser(); 301 } 302 String realUser = null; 303 if (userInfoProto.hasRealUser()) { 304 realUser = userInfoProto.getRealUser(); 305 } 306 if (effectiveUser != null) { 307 if (realUser != null) { 308 UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser); 309 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi); 310 } else { 311 ugi = UserGroupInformation.createRemoteUser(effectiveUser); 312 } 313 } 314 return ugi; 315 } 316 317 protected final void disposeSasl() { 318 if (saslServer != null) { 319 saslServer.dispose(); 320 saslServer = null; 321 } 322 } 323 324 /** 325 * No protobuf encoding of raw sasl messages 326 */ 327 protected final void doRawSaslReply(SaslStatus status, Writable rv, String errorClass, 328 String error) throws IOException { 329 BufferChain bc; 330 // In my testing, have noticed that sasl messages are usually 331 // in the ballpark of 100-200. That's why the initial capacity is 256. 332 try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256); 333 DataOutputStream out = new DataOutputStream(saslResponse)) { 334 out.writeInt(status.state); // write status 335 if (status == SaslStatus.SUCCESS) { 336 rv.write(out); 337 } else { 338 WritableUtils.writeString(out, errorClass); 339 WritableUtils.writeString(out, error); 340 } 341 bc = new BufferChain(saslResponse.getByteBuffer()); 342 } 343 doRespond(() -> bc); 344 } 345 346 HBaseSaslRpcServer getOrCreateSaslServer() throws IOException { 347 if (saslServer == null) { 348 saslServer = new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager); 349 } 350 return saslServer; 351 } 352 353 void finishSaslNegotiation() throws IOException { 354 String qop = saslServer.getNegotiatedQop(); 355 ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager); 356 RpcServer.LOG.debug( 357 "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi, qop); 358 rpcServer.metrics.authenticationSuccess(); 359 RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); 360 } 361 362 public void processOneRpc(ByteBuff buf) throws IOException, InterruptedException { 363 if (connectionHeaderRead) { 364 processRequest(buf); 365 } else { 366 processConnectionHeader(buf); 367 callCleanupIfNeeded(); 368 this.connectionHeaderRead = true; 369 if (rpcServer.needAuthorization() && !authorizeConnection()) { 370 // Throw FatalConnectionException wrapping ACE so client does right thing and closes 371 // down the connection instead of trying to read non-existent retun. 372 throw new AccessDeniedException("Connection from " + this + " for service " 373 + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); 374 } 375 this.user = this.rpcServer.userProvider.create(this.ugi); 376 } 377 } 378 379 private boolean authorizeConnection() throws IOException { 380 try { 381 // If auth method is DIGEST, the token was obtained by the 382 // real user for the effective user, therefore not required to 383 // authorize real user. doAs is allowed only for simple or kerberos 384 // authentication 385 if (ugi != null && ugi.getRealUser() != null && provider.supportsProtocolAuthentication()) { 386 ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf); 387 } 388 this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress()); 389 this.rpcServer.metrics.authorizationSuccess(); 390 } catch (AuthorizationException ae) { 391 if (RpcServer.LOG.isDebugEnabled()) { 392 RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae); 393 } 394 this.rpcServer.metrics.authorizationFailure(); 395 doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae))); 396 return false; 397 } 398 return true; 399 } 400 401 private CodedInputStream createCis(ByteBuff buf) { 402 // Here we read in the header. We avoid having pb 403 // do its default 4k allocation for CodedInputStream. We force it to use 404 // backing array. 405 CodedInputStream cis; 406 if (buf.hasArray()) { 407 cis = UnsafeByteOperations 408 .unsafeWrap(buf.array(), buf.arrayOffset() + buf.position(), buf.limit()).newCodedInput(); 409 } else { 410 cis = UnsafeByteOperations.unsafeWrap(new ByteBuffByteInput(buf, buf.limit()), 0, buf.limit()) 411 .newCodedInput(); 412 } 413 cis.enableAliasing(true); 414 return cis; 415 } 416 417 // Reads the connection header following version 418 private void processConnectionHeader(ByteBuff buf) throws IOException { 419 this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf)); 420 421 // we want to copy the attributes prior to releasing the buffer so that they don't get corrupted 422 // eventually 423 if (connectionHeader.getAttributeList().isEmpty()) { 424 this.connectionAttributes = Collections.emptyMap(); 425 } else { 426 this.connectionAttributes = 427 Maps.newHashMapWithExpectedSize(connectionHeader.getAttributeList().size()); 428 for (HBaseProtos.NameBytesPair nameBytesPair : connectionHeader.getAttributeList()) { 429 this.connectionAttributes.put(nameBytesPair.getName(), 430 nameBytesPair.getValue().toByteArray()); 431 } 432 } 433 String serviceName = connectionHeader.getServiceName(); 434 if (serviceName == null) { 435 throw new EmptyServiceNameException(); 436 } 437 this.service = RpcServer.getService(this.rpcServer.services, serviceName); 438 if (this.service == null) { 439 throw new UnknownServiceException(serviceName); 440 } 441 setupCellBlockCodecs(); 442 sendConnectionHeaderResponseIfNeeded(); 443 UserGroupInformation protocolUser = createUser(connectionHeader); 444 if (!useSasl) { 445 ugi = protocolUser; 446 if (ugi != null) { 447 ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE); 448 } 449 // audit logging for SASL authenticated users happens in saslReadAndProcess() 450 if (authenticatedWithFallback) { 451 RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", ugi, 452 getHostAddress()); 453 } 454 } else { 455 // user is authenticated 456 ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod()); 457 // Now we check if this is a proxy user case. If the protocol user is 458 // different from the 'user', it is a proxy user scenario. However, 459 // this is not allowed if user authenticated with DIGEST. 460 if ((protocolUser != null) && (!protocolUser.getUserName().equals(ugi.getUserName()))) { 461 if (!provider.supportsProtocolAuthentication()) { 462 // Not allowed to doAs if token authentication is used 463 throw new AccessDeniedException("Authenticated user (" + ugi 464 + ") doesn't match what the client claims to be (" + protocolUser + ")"); 465 } else { 466 // Effective user can be different from authenticated user 467 // for simple auth or kerberos auth 468 // The user is the real user. Now we create a proxy user 469 UserGroupInformation realUser = ugi; 470 ugi = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser); 471 // Now the user is a proxy user, set Authentication method Proxy. 472 ugi.setAuthenticationMethod(AuthenticationMethod.PROXY); 473 } 474 } 475 } 476 String version; 477 if (this.connectionHeader.hasVersionInfo()) { 478 // see if this connection will support RetryImmediatelyException 479 this.retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2); 480 version = this.connectionHeader.getVersionInfo().getVersion(); 481 } else { 482 version = "UNKNOWN"; 483 } 484 RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}", 485 this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName); 486 } 487 488 /** 489 * Send the response for connection header 490 */ 491 private void sendConnectionHeaderResponseIfNeeded() throws FatalConnectionException { 492 Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> pair = setupCryptoCipher(); 493 // Response the connection header if Crypto AES is enabled 494 if (pair == null) { 495 return; 496 } 497 try { 498 int size = pair.getFirst().getSerializedSize(); 499 BufferChain bc; 500 try (ByteBufferOutputStream bbOut = new ByteBufferOutputStream(4 + size); 501 DataOutputStream out = new DataOutputStream(bbOut)) { 502 out.writeInt(size); 503 pair.getFirst().writeTo(out); 504 bc = new BufferChain(bbOut.getByteBuffer()); 505 } 506 doRespond(new RpcResponse() { 507 508 @Override 509 public BufferChain getResponse() { 510 return bc; 511 } 512 513 @Override 514 public void done() { 515 // must switch after sending the connection header response, as the client still uses the 516 // original SaslClient to unwrap the data we send back 517 saslServer.switchToCryptoAES(pair.getSecond()); 518 } 519 }); 520 } catch (IOException ex) { 521 throw new UnsupportedCryptoException(ex.getMessage(), ex); 522 } 523 } 524 525 protected abstract void doRespond(RpcResponse resp) throws IOException; 526 527 /** 528 * Has the request header and the request param and optionally encoded data buffer all in this one 529 * array. 530 * <p/> 531 * Will be overridden in tests. 532 */ 533 protected void processRequest(ByteBuff buf) throws IOException, InterruptedException { 534 long totalRequestSize = buf.limit(); 535 int offset = 0; 536 // Here we read in the header. We avoid having pb 537 // do its default 4k allocation for CodedInputStream. We force it to use 538 // backing array. 539 CodedInputStream cis = createCis(buf); 540 int headerSize = cis.readRawVarint32(); 541 offset = cis.getTotalBytesRead(); 542 Message.Builder builder = RequestHeader.newBuilder(); 543 ProtobufUtil.mergeFrom(builder, cis, headerSize); 544 RequestHeader header = (RequestHeader) builder.build(); 545 offset += headerSize; 546 Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() 547 .extract(Context.current(), header.getTraceInfo(), getter); 548 549 // n.b. Management of this Span instance is a little odd. Most exit paths from this try scope 550 // are early-exits due to error cases. There's only one success path, the asynchronous call to 551 // RpcScheduler#dispatch. The success path assumes ownership of the span, which is represented 552 // by null-ing out the reference in this scope. All other paths end the span. Thus, and in 553 // order to avoid accidentally orphaning the span, the call to Span#end happens in a finally 554 // block iff the span is non-null. 555 Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx); 556 try (Scope ignored = span.makeCurrent()) { 557 int id = header.getCallId(); 558 // HBASE-28128 - if server is aborting, don't bother trying to process. It will 559 // fail at the handler layer, but worse might result in CallQueueTooBigException if the 560 // queue is full but server is not properly processing requests. Better to throw an aborted 561 // exception here so that the client can properly react. 562 if (rpcServer.server != null && rpcServer.server.isAborted()) { 563 RegionServerAbortedException serverIsAborted = new RegionServerAbortedException( 564 "Server " + rpcServer.server.getServerName() + " aborting"); 565 this.rpcServer.metrics.exception(serverIsAborted); 566 sendErrorResponseForCall(id, totalRequestSize, span, serverIsAborted.getMessage(), 567 serverIsAborted); 568 return; 569 } 570 571 if (RpcServer.LOG.isTraceEnabled()) { 572 RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) 573 + " totalRequestSize: " + totalRequestSize + " bytes"); 574 } 575 // Enforcing the call queue size, this triggers a retry in the client 576 // This is a bit late to be doing this check - we have already read in the 577 // total request. 578 if ( 579 (totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) 580 > this.rpcServer.maxQueueSizeInBytes 581 ) { 582 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 583 sendErrorResponseForCall(id, totalRequestSize, span, 584 "Call queue is full on " + this.rpcServer.server.getServerName() 585 + ", is hbase.ipc.server.max.callqueue.size too small?", 586 RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 587 return; 588 } 589 MethodDescriptor md = null; 590 Message param = null; 591 ExtendedCellScanner cellScanner = null; 592 try { 593 if (header.hasRequestParam() && header.getRequestParam()) { 594 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); 595 if (md == null) { 596 throw new UnsupportedOperationException(header.getMethodName()); 597 } 598 builder = this.service.getRequestPrototype(md).newBuilderForType(); 599 cis.resetSizeCounter(); 600 int paramSize = cis.readRawVarint32(); 601 offset += cis.getTotalBytesRead(); 602 if (builder != null) { 603 ProtobufUtil.mergeFrom(builder, cis, paramSize); 604 param = builder.build(); 605 } 606 offset += paramSize; 607 } else { 608 // currently header must have request param, so we directly throw 609 // exception here 610 String msg = "Invalid request header: " + TextFormat.shortDebugString(header) 611 + ", should have param set in it"; 612 RpcServer.LOG.warn(msg); 613 throw new DoNotRetryIOException(msg); 614 } 615 if (header.hasCellBlockMeta()) { 616 buf.position(offset); 617 ByteBuff dup = buf.duplicate(); 618 dup.limit(offset + header.getCellBlockMeta().getLength()); 619 cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec, 620 this.compressionCodec, dup); 621 } 622 } catch (Throwable thrown) { 623 InetSocketAddress address = this.rpcServer.getListenerAddress(); 624 String msg = (address != null ? address : "(channel closed)") 625 + " is unable to read call parameter from client " + getHostAddress(); 626 RpcServer.LOG.warn(msg, thrown); 627 628 this.rpcServer.metrics.exception(thrown); 629 630 final Throwable responseThrowable; 631 if (thrown instanceof LinkageError) { 632 // probably the hbase hadoop version does not match the running hadoop version 633 responseThrowable = new DoNotRetryIOException(thrown); 634 } else if (thrown instanceof UnsupportedOperationException) { 635 // If the method is not present on the server, do not retry. 636 responseThrowable = new DoNotRetryIOException(thrown); 637 } else { 638 responseThrowable = thrown; 639 } 640 641 sendErrorResponseForCall(id, totalRequestSize, span, 642 msg + "; " + responseThrowable.getMessage(), responseThrowable); 643 return; 644 } 645 646 int timeout = 0; 647 if (header.hasTimeout() && header.getTimeout() > 0) { 648 timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); 649 } 650 ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, 651 totalRequestSize, this.addr, timeout, this.callCleanup); 652 653 if (this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { 654 // unset span do that it's not closed in the finally block 655 span = null; 656 } else { 657 this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); 658 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 659 call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, 660 "Call queue is full on " + this.rpcServer.server.getServerName() 661 + ", too many items queued ?"); 662 TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 663 call.sendResponseIfReady(); 664 } 665 } finally { 666 if (span != null) { 667 span.end(); 668 } 669 } 670 } 671 672 private void sendErrorResponseForCall(int id, long totalRequestSize, Span span, String msg, 673 Throwable responseThrowable) throws IOException { 674 ServerCall<?> failedcall = createCall(id, this.service, null, null, null, null, 675 totalRequestSize, null, 0, this.callCleanup); 676 failedcall.setResponse(null, null, responseThrowable, msg); 677 TraceUtil.setError(span, responseThrowable); 678 failedcall.sendResponseIfReady(); 679 } 680 681 protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException { 682 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1); 683 ServerCall.setExceptionResponse(e, msg, headerBuilder); 684 ByteBuffer headerBuf = 685 ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null); 686 BufferChain buf = new BufferChain(headerBuf); 687 return () -> buf; 688 } 689 690 private void doBadPreambleHandling(String msg) throws IOException { 691 doBadPreambleHandling(msg, new FatalConnectionException(msg)); 692 } 693 694 private void doBadPreambleHandling(String msg, Exception e) throws IOException { 695 RpcServer.LOG.warn(msg, e); 696 doRespond(getErrorResponse(msg, e)); 697 } 698 699 private void doPreambleResponse(Message resp) throws IOException { 700 ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build(); 701 ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, null); 702 BufferChain bufChain = new BufferChain(buf); 703 doRespond(() -> bufChain); 704 } 705 706 private boolean doConnectionRegistryResponse() throws IOException { 707 if (!(rpcServer.server instanceof ConnectionRegistryEndpoint)) { 708 // should be in tests or some scenarios where we should not reach here 709 return false; 710 } 711 // on backup masters, this request may be blocked since we need to fetch it from filesystem, 712 // but since it is just backup master, it is not a critical problem 713 String clusterId = ((ConnectionRegistryEndpoint) rpcServer.server).getClusterId(); 714 RpcServer.LOG.debug("Response connection registry, clusterId = '{}'", clusterId); 715 if (clusterId == null) { 716 // should be in tests or some scenarios where we should not reach here 717 return false; 718 } 719 GetConnectionRegistryResponse resp = 720 GetConnectionRegistryResponse.newBuilder().setClusterId(clusterId).build(); 721 doPreambleResponse(resp); 722 return true; 723 } 724 725 private void doSecurityPreambleResponse() throws IOException { 726 if (rpcServer.isSecurityEnabled) { 727 SecurityPreamableResponse resp = SecurityPreamableResponse.newBuilder() 728 .setServerPrincipal(rpcServer.serverPrincipal).build(); 729 doPreambleResponse(resp); 730 } else { 731 // security is not enabled, do not need a principal when connecting, throw a special exception 732 // to let client know it should just use simple authentication 733 doRespond(getErrorResponse("security is not enabled", new SecurityNotEnabledException())); 734 } 735 } 736 737 protected final void callCleanupIfNeeded() { 738 if (callCleanup != null) { 739 callCleanup.run(); 740 callCleanup = null; 741 } 742 } 743 744 protected enum PreambleResponse { 745 SUCCEED, // successfully processed the rpc preamble header 746 CONTINUE, // the preamble header is for other purpose, wait for the rpc preamble header 747 CLOSE // close the rpc connection 748 } 749 750 protected final PreambleResponse processPreamble(ByteBuffer preambleBuffer) throws IOException { 751 assert preambleBuffer.remaining() == 6; 752 if ( 753 ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, 754 RpcClient.REGISTRY_PREAMBLE_HEADER, 0, 6) && doConnectionRegistryResponse() 755 ) { 756 return PreambleResponse.CLOSE; 757 } 758 if ( 759 ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, 760 RpcClient.SECURITY_PREAMBLE_HEADER, 0, 6) 761 ) { 762 doSecurityPreambleResponse(); 763 return PreambleResponse.CONTINUE; 764 } 765 if (!ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 4, RPC_HEADER, 0, 4)) { 766 doBadPreambleHandling( 767 "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" 768 + Bytes.toStringBinary( 769 ByteBufferUtils.toBytes(preambleBuffer, preambleBuffer.position(), RPC_HEADER.length), 770 0, RPC_HEADER.length) 771 + " from " + toString()); 772 return PreambleResponse.CLOSE; 773 } 774 int version = preambleBuffer.get(preambleBuffer.position() + 4) & 0xFF; 775 byte authByte = preambleBuffer.get(preambleBuffer.position() + 5); 776 if (version != RpcServer.CURRENT_VERSION) { 777 String msg = getFatalConnectionString(version, authByte); 778 doBadPreambleHandling(msg, new WrongVersionException(msg)); 779 return PreambleResponse.CLOSE; 780 } 781 782 this.provider = this.saslProviders.selectProvider(authByte); 783 if (this.provider == null) { 784 String msg = getFatalConnectionString(version, authByte); 785 doBadPreambleHandling(msg, new BadAuthException(msg)); 786 return PreambleResponse.CLOSE; 787 } 788 // TODO this is a wart while simple auth'n doesn't go through sasl. 789 if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) { 790 if (this.rpcServer.allowFallbackToSimpleAuth) { 791 this.rpcServer.metrics.authenticationFallback(); 792 authenticatedWithFallback = true; 793 } else { 794 AccessDeniedException ae = new AccessDeniedException("Authentication is required"); 795 doRespond(getErrorResponse(ae.getMessage(), ae)); 796 return PreambleResponse.CLOSE; 797 } 798 } 799 if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) { 800 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, 801 null); 802 provider = saslProviders.getSimpleProvider(); 803 // client has already sent the initial Sasl message and we 804 // should ignore it. Both client and server should fall back 805 // to simple auth from now on. 806 skipInitialSaslHandshake = true; 807 } 808 useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider); 809 return PreambleResponse.SUCCEED; 810 } 811 812 boolean isSimpleAuthentication() { 813 return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider; 814 } 815 816 public abstract boolean isConnectionOpen(); 817 818 public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md, 819 RequestHeader header, Message param, ExtendedCellScanner cellScanner, long size, 820 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup); 821 822 private static class ByteBuffByteInput extends ByteInput { 823 824 private ByteBuff buf; 825 private int length; 826 827 ByteBuffByteInput(ByteBuff buf, int length) { 828 this.buf = buf; 829 this.length = length; 830 } 831 832 @Override 833 public byte read(int offset) { 834 return this.buf.get(offset); 835 } 836 837 @Override 838 public int read(int offset, byte[] out, int outOffset, int len) { 839 this.buf.get(offset, out, outOffset, len); 840 return len; 841 } 842 843 @Override 844 public int read(int offset, ByteBuffer out) { 845 int len = out.remaining(); 846 this.buf.get(out, offset, len); 847 return len; 848 } 849 850 @Override 851 public int size() { 852 return this.length; 853 } 854 } 855}