001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER; 021 022import io.opentelemetry.api.GlobalOpenTelemetry; 023import io.opentelemetry.api.trace.Span; 024import io.opentelemetry.context.Context; 025import io.opentelemetry.context.Scope; 026import io.opentelemetry.context.propagation.TextMapGetter; 027import java.io.Closeable; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.net.InetAddress; 031import java.net.InetSocketAddress; 032import java.nio.ByteBuffer; 033import java.security.GeneralSecurityException; 034import java.security.cert.X509Certificate; 035import java.util.Collections; 036import java.util.Map; 037import java.util.Objects; 038import java.util.Properties; 039import org.apache.commons.crypto.cipher.CryptoCipherFactory; 040import org.apache.commons.crypto.random.CryptoRandom; 041import org.apache.commons.crypto.random.CryptoRandomFactory; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.ExtendedCellScanner; 044import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint; 045import org.apache.hadoop.hbase.client.VersionInfoUtil; 046import org.apache.hadoop.hbase.codec.Codec; 047import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 048import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 049import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 050import org.apache.hadoop.hbase.nio.ByteBuff; 051import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 052import org.apache.hadoop.hbase.security.AccessDeniedException; 053import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 054import org.apache.hadoop.hbase.security.SaslStatus; 055import org.apache.hadoop.hbase.security.SaslUtil; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider; 058import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders; 059import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider; 060import org.apache.hadoop.hbase.trace.TraceUtil; 061import org.apache.hadoop.hbase.util.ByteBufferUtils; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.Pair; 064import org.apache.hadoop.io.IntWritable; 065import org.apache.hadoop.io.Writable; 066import org.apache.hadoop.io.WritableUtils; 067import org.apache.hadoop.io.compress.CompressionCodec; 068import org.apache.hadoop.security.UserGroupInformation; 069import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 070import org.apache.hadoop.security.authorize.AuthorizationException; 071import org.apache.hadoop.security.authorize.ProxyUsers; 072import org.apache.yetus.audience.InterfaceAudience; 073 074import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 075import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 076import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; 077import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 078import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 079import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 080import org.apache.hbase.thirdparty.com.google.protobuf.Message; 081import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 082import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 083 084import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; 095 096/** Reads calls from a connection and queues them for handling. */ 097@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 098 justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 099@InterfaceAudience.Private 100abstract class ServerRpcConnection implements Closeable { 101 102 private static final TextMapGetter<RPCTInfo> getter = new RPCTInfoGetter(); 103 104 protected final RpcServer rpcServer; 105 // If the connection header has been read or not. 106 protected boolean connectionHeaderRead = false; 107 108 protected CallCleanup callCleanup; 109 110 // Cache the remote host & port info so that even if the socket is 111 // disconnected, we can say where it used to connect to. 112 protected String hostAddress; 113 protected int remotePort; 114 protected InetAddress addr; 115 protected ConnectionHeader connectionHeader; 116 protected Map<String, byte[]> connectionAttributes; 117 118 /** 119 * Codec the client asked use. 120 */ 121 protected Codec codec; 122 /** 123 * Compression codec the client asked us use. 124 */ 125 protected CompressionCodec compressionCodec; 126 protected BlockingService service; 127 128 protected SaslServerAuthenticationProvider provider; 129 protected boolean skipInitialSaslHandshake; 130 protected boolean useSasl; 131 protected HBaseSaslRpcServer saslServer; 132 133 // was authentication allowed with a fallback to simple auth 134 protected boolean authenticatedWithFallback; 135 136 protected boolean retryImmediatelySupported = false; 137 138 protected User user = null; 139 protected UserGroupInformation ugi = null; 140 protected SaslServerAuthenticationProviders saslProviders = null; 141 protected X509Certificate[] clientCertificateChain = null; 142 143 public ServerRpcConnection(RpcServer rpcServer) { 144 this.rpcServer = rpcServer; 145 this.callCleanup = null; 146 this.saslProviders = SaslServerAuthenticationProviders.getInstance(rpcServer.getConf()); 147 } 148 149 @Override 150 public String toString() { 151 return getHostAddress() + ":" + remotePort; 152 } 153 154 public String getHostAddress() { 155 return hostAddress; 156 } 157 158 public InetAddress getHostInetAddress() { 159 return addr; 160 } 161 162 public int getRemotePort() { 163 return remotePort; 164 } 165 166 public VersionInfo getVersionInfo() { 167 if (connectionHeader != null && connectionHeader.hasVersionInfo()) { 168 return connectionHeader.getVersionInfo(); 169 } 170 return null; 171 } 172 173 private String getFatalConnectionString(final int version, final byte authByte) { 174 return "serverVersion=" + RpcServer.CURRENT_VERSION + ", clientVersion=" + version 175 + ", authMethod=" + authByte + 176 // The provider may be null if we failed to parse the header of the request 177 ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName()) 178 + " from " + toString(); 179 } 180 181 /** 182 * Set up cell block codecs 183 */ 184 private void setupCellBlockCodecs() throws FatalConnectionException { 185 // TODO: Plug in other supported decoders. 186 if (!connectionHeader.hasCellBlockCodecClass()) { 187 return; 188 } 189 String className = connectionHeader.getCellBlockCodecClass(); 190 if (className == null || className.length() == 0) { 191 return; 192 } 193 try { 194 this.codec = (Codec) Class.forName(className).getDeclaredConstructor().newInstance(); 195 } catch (Exception e) { 196 throw new UnsupportedCellCodecException(className, e); 197 } 198 if (!connectionHeader.hasCellBlockCompressorClass()) { 199 return; 200 } 201 className = connectionHeader.getCellBlockCompressorClass(); 202 try { 203 this.compressionCodec = 204 (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance(); 205 } catch (Exception e) { 206 throw new UnsupportedCompressionCodecException(className, e); 207 } 208 } 209 210 /** 211 * Set up cipher for rpc encryption with Apache Commons Crypto. 212 */ 213 private Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> setupCryptoCipher() 214 throws FatalConnectionException { 215 // If simple auth, return 216 if (saslServer == null) { 217 return null; 218 } 219 // check if rpc encryption with Crypto AES 220 String qop = saslServer.getNegotiatedQop(); 221 boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop); 222 boolean isCryptoAesEncryption = isEncryption 223 && this.rpcServer.conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false); 224 if (!isCryptoAesEncryption) { 225 return null; 226 } 227 if (!connectionHeader.hasRpcCryptoCipherTransformation()) { 228 return null; 229 } 230 String transformation = connectionHeader.getRpcCryptoCipherTransformation(); 231 if (transformation == null || transformation.length() == 0) { 232 return null; 233 } 234 // Negotiates AES based on complete saslServer. 235 // The Crypto metadata need to be encrypted and send to client. 236 Properties properties = new Properties(); 237 // the property for SecureRandomFactory 238 properties.setProperty(CryptoRandomFactory.CLASSES_KEY, 239 this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random", 240 "org.apache.commons.crypto.random.JavaCryptoRandom")); 241 // the property for cipher class 242 properties.setProperty(CryptoCipherFactory.CLASSES_KEY, 243 this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class", 244 "org.apache.commons.crypto.cipher.JceCipher")); 245 246 int cipherKeyBits = 247 this.rpcServer.conf.getInt("hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128); 248 // generate key and iv 249 if (cipherKeyBits % 8 != 0) { 250 throw new IllegalArgumentException( 251 "The AES cipher key size in bits" + " should be a multiple of byte"); 252 } 253 int len = cipherKeyBits / 8; 254 byte[] inKey = new byte[len]; 255 byte[] outKey = new byte[len]; 256 byte[] inIv = new byte[len]; 257 byte[] outIv = new byte[len]; 258 259 CryptoAES cryptoAES; 260 try { 261 // generate the cipher meta data with SecureRandom 262 CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); 263 secureRandom.nextBytes(inKey); 264 secureRandom.nextBytes(outKey); 265 secureRandom.nextBytes(inIv); 266 secureRandom.nextBytes(outIv); 267 268 // create CryptoAES for server 269 cryptoAES = new CryptoAES(transformation, properties, inKey, outKey, inIv, outIv); 270 } catch (GeneralSecurityException | IOException ex) { 271 throw new UnsupportedCryptoException(ex.getMessage(), ex); 272 } 273 // create SaslCipherMeta and send to client, 274 // for client, the [inKey, outKey], [inIv, outIv] should be reversed 275 RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder(); 276 ccmBuilder.setTransformation(transformation); 277 ccmBuilder.setInIv(getByteString(outIv)); 278 ccmBuilder.setInKey(getByteString(outKey)); 279 ccmBuilder.setOutIv(getByteString(inIv)); 280 ccmBuilder.setOutKey(getByteString(inKey)); 281 RPCProtos.ConnectionHeaderResponse resp = 282 RPCProtos.ConnectionHeaderResponse.newBuilder().setCryptoCipherMeta(ccmBuilder).build(); 283 return Pair.newPair(resp, cryptoAES); 284 } 285 286 private ByteString getByteString(byte[] bytes) { 287 // return singleton to reduce object allocation 288 return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); 289 } 290 291 private UserGroupInformation createUser(ConnectionHeader head) { 292 UserGroupInformation ugi = null; 293 294 if (!head.hasUserInfo()) { 295 return null; 296 } 297 UserInformation userInfoProto = head.getUserInfo(); 298 String effectiveUser = null; 299 if (userInfoProto.hasEffectiveUser()) { 300 effectiveUser = userInfoProto.getEffectiveUser(); 301 } 302 String realUser = null; 303 if (userInfoProto.hasRealUser()) { 304 realUser = userInfoProto.getRealUser(); 305 } 306 if (effectiveUser != null) { 307 if (realUser != null) { 308 UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser); 309 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi); 310 } else { 311 ugi = UserGroupInformation.createRemoteUser(effectiveUser); 312 } 313 } 314 return ugi; 315 } 316 317 protected final void disposeSasl() { 318 if (saslServer != null) { 319 saslServer.dispose(); 320 saslServer = null; 321 } 322 } 323 324 /** 325 * No protobuf encoding of raw sasl messages 326 */ 327 protected final void doRawSaslReply(SaslStatus status, Writable rv, String errorClass, 328 String error) throws IOException { 329 BufferChain bc; 330 // In my testing, have noticed that sasl messages are usually 331 // in the ballpark of 100-200. That's why the initial capacity is 256. 332 try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256); 333 DataOutputStream out = new DataOutputStream(saslResponse)) { 334 out.writeInt(status.state); // write status 335 if (status == SaslStatus.SUCCESS) { 336 rv.write(out); 337 } else { 338 WritableUtils.writeString(out, errorClass); 339 WritableUtils.writeString(out, error); 340 } 341 bc = new BufferChain(saslResponse.getByteBuffer()); 342 } 343 doRespond(() -> bc); 344 } 345 346 HBaseSaslRpcServer getOrCreateSaslServer() throws IOException { 347 if (saslServer == null) { 348 saslServer = new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager); 349 } 350 return saslServer; 351 } 352 353 void finishSaslNegotiation() throws IOException { 354 String qop = saslServer.getNegotiatedQop(); 355 ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager); 356 RpcServer.LOG.debug( 357 "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi, qop); 358 rpcServer.metrics.authenticationSuccess(); 359 RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); 360 } 361 362 public void processOneRpc(ByteBuff buf) throws IOException, InterruptedException { 363 if (connectionHeaderRead) { 364 processRequest(buf); 365 } else { 366 processConnectionHeader(buf); 367 callCleanupIfNeeded(); 368 this.connectionHeaderRead = true; 369 this.rpcServer.getRpcCoprocessorHost().preAuthorizeConnection(connectionHeader, addr); 370 if (rpcServer.needAuthorization() && !authorizeConnection()) { 371 // Throw FatalConnectionException wrapping ACE so client does right thing and closes 372 // down the connection instead of trying to read non-existent retun. 373 throw new AccessDeniedException("Connection from " + this + " for service " 374 + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); 375 } 376 this.user = this.rpcServer.userProvider.create(this.ugi); 377 this.rpcServer.getRpcCoprocessorHost().postAuthorizeConnection( 378 this.user != null ? this.user.getName() : null, this.clientCertificateChain); 379 } 380 } 381 382 private boolean authorizeConnection() throws IOException { 383 try { 384 // If auth method is DIGEST, the token was obtained by the 385 // real user for the effective user, therefore not required to 386 // authorize real user. doAs is allowed only for simple or kerberos 387 // authentication 388 if (ugi != null && ugi.getRealUser() != null && provider.supportsProtocolAuthentication()) { 389 ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf); 390 } 391 this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress()); 392 this.rpcServer.metrics.authorizationSuccess(); 393 } catch (AuthorizationException ae) { 394 if (RpcServer.LOG.isDebugEnabled()) { 395 RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae); 396 } 397 this.rpcServer.metrics.authorizationFailure(); 398 doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae))); 399 return false; 400 } 401 return true; 402 } 403 404 private CodedInputStream createCis(ByteBuff buf) { 405 // Here we read in the header. We avoid having pb 406 // do its default 4k allocation for CodedInputStream. We force it to use 407 // backing array. 408 CodedInputStream cis; 409 if (buf.hasArray()) { 410 cis = UnsafeByteOperations 411 .unsafeWrap(buf.array(), buf.arrayOffset() + buf.position(), buf.limit()).newCodedInput(); 412 } else { 413 cis = UnsafeByteOperations.unsafeWrap(new ByteBuffByteInput(buf, buf.limit()), 0, buf.limit()) 414 .newCodedInput(); 415 } 416 cis.enableAliasing(true); 417 return cis; 418 } 419 420 // Reads the connection header following version 421 private void processConnectionHeader(ByteBuff buf) throws IOException { 422 this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf)); 423 424 // we want to copy the attributes prior to releasing the buffer so that they don't get corrupted 425 // eventually 426 if (connectionHeader.getAttributeList().isEmpty()) { 427 this.connectionAttributes = Collections.emptyMap(); 428 } else { 429 this.connectionAttributes = 430 Maps.newHashMapWithExpectedSize(connectionHeader.getAttributeList().size()); 431 for (HBaseProtos.NameBytesPair nameBytesPair : connectionHeader.getAttributeList()) { 432 this.connectionAttributes.put(nameBytesPair.getName(), 433 nameBytesPair.getValue().toByteArray()); 434 } 435 } 436 String serviceName = connectionHeader.getServiceName(); 437 if (serviceName == null) { 438 throw new EmptyServiceNameException(); 439 } 440 this.service = RpcServer.getService(this.rpcServer.services, serviceName); 441 if (this.service == null) { 442 throw new UnknownServiceException(serviceName); 443 } 444 setupCellBlockCodecs(); 445 sendConnectionHeaderResponseIfNeeded(); 446 UserGroupInformation protocolUser = createUser(connectionHeader); 447 if (!useSasl) { 448 ugi = protocolUser; 449 if (ugi != null) { 450 ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE); 451 } 452 // audit logging for SASL authenticated users happens in saslReadAndProcess() 453 if (authenticatedWithFallback) { 454 RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", ugi, 455 getHostAddress()); 456 } 457 } else { 458 // user is authenticated 459 ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod()); 460 // Now we check if this is a proxy user case. If the protocol user is 461 // different from the 'user', it is a proxy user scenario. However, 462 // this is not allowed if user authenticated with DIGEST. 463 if ((protocolUser != null) && (!protocolUser.getUserName().equals(ugi.getUserName()))) { 464 if (!provider.supportsProtocolAuthentication()) { 465 // Not allowed to doAs if token authentication is used 466 throw new AccessDeniedException("Authenticated user (" + ugi 467 + ") doesn't match what the client claims to be (" + protocolUser + ")"); 468 } else { 469 // Effective user can be different from authenticated user 470 // for simple auth or kerberos auth 471 // The user is the real user. Now we create a proxy user 472 UserGroupInformation realUser = ugi; 473 ugi = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser); 474 // Now the user is a proxy user, set Authentication method Proxy. 475 ugi.setAuthenticationMethod(AuthenticationMethod.PROXY); 476 } 477 } 478 } 479 String version; 480 if (this.connectionHeader.hasVersionInfo()) { 481 // see if this connection will support RetryImmediatelyException 482 this.retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2); 483 version = this.connectionHeader.getVersionInfo().getVersion(); 484 } else { 485 version = "UNKNOWN"; 486 } 487 RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}", 488 this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName); 489 } 490 491 /** 492 * Send the response for connection header 493 */ 494 private void sendConnectionHeaderResponseIfNeeded() throws FatalConnectionException { 495 Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> pair = setupCryptoCipher(); 496 // Response the connection header if Crypto AES is enabled 497 if (pair == null) { 498 return; 499 } 500 try { 501 int size = pair.getFirst().getSerializedSize(); 502 BufferChain bc; 503 try (ByteBufferOutputStream bbOut = new ByteBufferOutputStream(4 + size); 504 DataOutputStream out = new DataOutputStream(bbOut)) { 505 out.writeInt(size); 506 pair.getFirst().writeTo(out); 507 bc = new BufferChain(bbOut.getByteBuffer()); 508 } 509 doRespond(new RpcResponse() { 510 511 @Override 512 public BufferChain getResponse() { 513 return bc; 514 } 515 516 @Override 517 public void done() { 518 // must switch after sending the connection header response, as the client still uses the 519 // original SaslClient to unwrap the data we send back 520 saslServer.switchToCryptoAES(pair.getSecond()); 521 } 522 }); 523 } catch (IOException ex) { 524 throw new UnsupportedCryptoException(ex.getMessage(), ex); 525 } 526 } 527 528 protected abstract void doRespond(RpcResponse resp) throws IOException; 529 530 /** 531 * Has the request header and the request param and optionally encoded data buffer all in this one 532 * array. 533 * <p/> 534 * Will be overridden in tests. 535 */ 536 protected void processRequest(ByteBuff buf) throws IOException, InterruptedException { 537 long totalRequestSize = buf.limit(); 538 int offset = 0; 539 // Here we read in the header. We avoid having pb 540 // do its default 4k allocation for CodedInputStream. We force it to use 541 // backing array. 542 CodedInputStream cis = createCis(buf); 543 int headerSize = cis.readRawVarint32(); 544 offset = cis.getTotalBytesRead(); 545 Message.Builder builder = RequestHeader.newBuilder(); 546 ProtobufUtil.mergeFrom(builder, cis, headerSize); 547 RequestHeader header = (RequestHeader) builder.build(); 548 offset += headerSize; 549 Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() 550 .extract(Context.current(), header.getTraceInfo(), getter); 551 552 // n.b. Management of this Span instance is a little odd. Most exit paths from this try scope 553 // are early-exits due to error cases. There's only one success path, the asynchronous call to 554 // RpcScheduler#dispatch. The success path assumes ownership of the span, which is represented 555 // by null-ing out the reference in this scope. All other paths end the span. Thus, and in 556 // order to avoid accidentally orphaning the span, the call to Span#end happens in a finally 557 // block iff the span is non-null. 558 Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx); 559 try (Scope ignored = span.makeCurrent()) { 560 int id = header.getCallId(); 561 // HBASE-28128 - if server is aborting, don't bother trying to process. It will 562 // fail at the handler layer, but worse might result in CallQueueTooBigException if the 563 // queue is full but server is not properly processing requests. Better to throw an aborted 564 // exception here so that the client can properly react. 565 if (rpcServer.server != null && rpcServer.server.isAborted()) { 566 RegionServerAbortedException serverIsAborted = new RegionServerAbortedException( 567 "Server " + rpcServer.server.getServerName() + " aborting"); 568 this.rpcServer.metrics.exception(serverIsAborted); 569 sendErrorResponseForCall(id, totalRequestSize, span, serverIsAborted.getMessage(), 570 serverIsAborted); 571 return; 572 } 573 574 if (RpcServer.LOG.isTraceEnabled()) { 575 RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) 576 + " totalRequestSize: " + totalRequestSize + " bytes"); 577 } 578 // Enforcing the call queue size, this triggers a retry in the client 579 // This is a bit late to be doing this check - we have already read in the 580 // total request. 581 if ( 582 (totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) 583 > this.rpcServer.maxQueueSizeInBytes 584 ) { 585 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 586 sendErrorResponseForCall(id, totalRequestSize, span, 587 "Call queue is full on " + this.rpcServer.server.getServerName() 588 + ", is hbase.ipc.server.max.callqueue.size too small?", 589 RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 590 return; 591 } 592 MethodDescriptor md = null; 593 Message param = null; 594 ExtendedCellScanner cellScanner = null; 595 try { 596 if (header.hasRequestParam() && header.getRequestParam()) { 597 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); 598 if (md == null) { 599 throw new UnsupportedOperationException(header.getMethodName()); 600 } 601 builder = this.service.getRequestPrototype(md).newBuilderForType(); 602 cis.resetSizeCounter(); 603 int paramSize = cis.readRawVarint32(); 604 offset += cis.getTotalBytesRead(); 605 if (builder != null) { 606 ProtobufUtil.mergeFrom(builder, cis, paramSize); 607 param = builder.build(); 608 } 609 offset += paramSize; 610 } else { 611 // currently header must have request param, so we directly throw 612 // exception here 613 String msg = "Invalid request header: " + TextFormat.shortDebugString(header) 614 + ", should have param set in it"; 615 RpcServer.LOG.warn(msg); 616 throw new DoNotRetryIOException(msg); 617 } 618 if (header.hasCellBlockMeta()) { 619 buf.position(offset); 620 ByteBuff dup = buf.duplicate(); 621 dup.limit(offset + header.getCellBlockMeta().getLength()); 622 cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec, 623 this.compressionCodec, dup); 624 } 625 } catch (Throwable thrown) { 626 InetSocketAddress address = this.rpcServer.getListenerAddress(); 627 String msg = (address != null ? address : "(channel closed)") 628 + " is unable to read call parameter from client " + getHostAddress(); 629 RpcServer.LOG.warn(msg, thrown); 630 631 this.rpcServer.metrics.exception(thrown); 632 633 final Throwable responseThrowable; 634 if (thrown instanceof LinkageError) { 635 // probably the hbase hadoop version does not match the running hadoop version 636 responseThrowable = new DoNotRetryIOException(thrown); 637 } else if (thrown instanceof UnsupportedOperationException) { 638 // If the method is not present on the server, do not retry. 639 responseThrowable = new DoNotRetryIOException(thrown); 640 } else { 641 responseThrowable = thrown; 642 } 643 644 sendErrorResponseForCall(id, totalRequestSize, span, 645 msg + "; " + responseThrowable.getMessage(), responseThrowable); 646 return; 647 } 648 649 int timeout = 0; 650 if (header.hasTimeout() && header.getTimeout() > 0) { 651 timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); 652 } 653 ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, 654 totalRequestSize, this.addr, timeout, this.callCleanup); 655 656 if (this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { 657 // unset span do that it's not closed in the finally block 658 span = null; 659 } else { 660 this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); 661 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 662 call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, 663 "Call queue is full on " + this.rpcServer.server.getServerName() 664 + ", too many items queued ?"); 665 TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 666 call.sendResponseIfReady(); 667 } 668 } finally { 669 if (span != null) { 670 span.end(); 671 } 672 } 673 } 674 675 private void sendErrorResponseForCall(int id, long totalRequestSize, Span span, String msg, 676 Throwable responseThrowable) throws IOException { 677 ServerCall<?> failedcall = createCall(id, this.service, null, null, null, null, 678 totalRequestSize, null, 0, this.callCleanup); 679 failedcall.setResponse(null, null, responseThrowable, msg); 680 TraceUtil.setError(span, responseThrowable); 681 failedcall.sendResponseIfReady(); 682 } 683 684 protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException { 685 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1); 686 ServerCall.setExceptionResponse(e, msg, headerBuilder); 687 ByteBuffer headerBuf = 688 ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null); 689 BufferChain buf = new BufferChain(headerBuf); 690 return () -> buf; 691 } 692 693 private void doBadPreambleHandling(String msg) throws IOException { 694 doBadPreambleHandling(msg, new FatalConnectionException(msg)); 695 } 696 697 private void doBadPreambleHandling(String msg, Exception e) throws IOException { 698 RpcServer.LOG.warn(msg, e); 699 doRespond(getErrorResponse(msg, e)); 700 } 701 702 private void doPreambleResponse(Message resp) throws IOException { 703 ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build(); 704 ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, null); 705 BufferChain bufChain = new BufferChain(buf); 706 doRespond(() -> bufChain); 707 } 708 709 private boolean doConnectionRegistryResponse() throws IOException { 710 if (!(rpcServer.server instanceof ConnectionRegistryEndpoint)) { 711 // should be in tests or some scenarios where we should not reach here 712 return false; 713 } 714 // on backup masters, this request may be blocked since we need to fetch it from filesystem, 715 // but since it is just backup master, it is not a critical problem 716 String clusterId = ((ConnectionRegistryEndpoint) rpcServer.server).getClusterId(); 717 RpcServer.LOG.debug("Response connection registry, clusterId = '{}'", clusterId); 718 if (clusterId == null) { 719 // should be in tests or some scenarios where we should not reach here 720 return false; 721 } 722 GetConnectionRegistryResponse resp = 723 GetConnectionRegistryResponse.newBuilder().setClusterId(clusterId).build(); 724 doPreambleResponse(resp); 725 return true; 726 } 727 728 private void doSecurityPreambleResponse() throws IOException { 729 if (rpcServer.isSecurityEnabled) { 730 SecurityPreamableResponse resp = SecurityPreamableResponse.newBuilder() 731 .setServerPrincipal(rpcServer.serverPrincipal).build(); 732 doPreambleResponse(resp); 733 } else { 734 // security is not enabled, do not need a principal when connecting, throw a special exception 735 // to let client know it should just use simple authentication 736 doRespond(getErrorResponse("security is not enabled", new SecurityNotEnabledException())); 737 } 738 } 739 740 protected final void callCleanupIfNeeded() { 741 if (callCleanup != null) { 742 callCleanup.run(); 743 callCleanup = null; 744 } 745 } 746 747 protected enum PreambleResponse { 748 SUCCEED, // successfully processed the rpc preamble header 749 CONTINUE, // the preamble header is for other purpose, wait for the rpc preamble header 750 CLOSE // close the rpc connection 751 } 752 753 protected final PreambleResponse processPreamble(ByteBuffer preambleBuffer) throws IOException { 754 assert preambleBuffer.remaining() == 6; 755 if ( 756 ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, 757 RpcClient.REGISTRY_PREAMBLE_HEADER, 0, 6) && doConnectionRegistryResponse() 758 ) { 759 return PreambleResponse.CLOSE; 760 } 761 if ( 762 ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, 763 RpcClient.SECURITY_PREAMBLE_HEADER, 0, 6) 764 ) { 765 doSecurityPreambleResponse(); 766 return PreambleResponse.CONTINUE; 767 } 768 if (!ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 4, RPC_HEADER, 0, 4)) { 769 doBadPreambleHandling( 770 "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" 771 + Bytes.toStringBinary( 772 ByteBufferUtils.toBytes(preambleBuffer, preambleBuffer.position(), RPC_HEADER.length), 773 0, RPC_HEADER.length) 774 + " from " + toString()); 775 return PreambleResponse.CLOSE; 776 } 777 int version = preambleBuffer.get(preambleBuffer.position() + 4) & 0xFF; 778 byte authByte = preambleBuffer.get(preambleBuffer.position() + 5); 779 if (version != RpcServer.CURRENT_VERSION) { 780 String msg = getFatalConnectionString(version, authByte); 781 doBadPreambleHandling(msg, new WrongVersionException(msg)); 782 return PreambleResponse.CLOSE; 783 } 784 785 this.provider = this.saslProviders.selectProvider(authByte); 786 if (this.provider == null) { 787 String msg = getFatalConnectionString(version, authByte); 788 doBadPreambleHandling(msg, new BadAuthException(msg)); 789 return PreambleResponse.CLOSE; 790 } 791 // TODO this is a wart while simple auth'n doesn't go through sasl. 792 if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) { 793 if (this.rpcServer.allowFallbackToSimpleAuth) { 794 this.rpcServer.metrics.authenticationFallback(); 795 authenticatedWithFallback = true; 796 } else { 797 AccessDeniedException ae = new AccessDeniedException("Authentication is required"); 798 doRespond(getErrorResponse(ae.getMessage(), ae)); 799 return PreambleResponse.CLOSE; 800 } 801 } 802 if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) { 803 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, 804 null); 805 provider = saslProviders.getSimpleProvider(); 806 // client has already sent the initial Sasl message and we 807 // should ignore it. Both client and server should fall back 808 // to simple auth from now on. 809 skipInitialSaslHandshake = true; 810 } 811 useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider); 812 return PreambleResponse.SUCCEED; 813 } 814 815 boolean isSimpleAuthentication() { 816 return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider; 817 } 818 819 public abstract boolean isConnectionOpen(); 820 821 public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md, 822 RequestHeader header, Message param, ExtendedCellScanner cellScanner, long size, 823 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup); 824 825 private static class ByteBuffByteInput extends ByteInput { 826 827 private ByteBuff buf; 828 private int length; 829 830 ByteBuffByteInput(ByteBuff buf, int length) { 831 this.buf = buf; 832 this.length = length; 833 } 834 835 @Override 836 public byte read(int offset) { 837 return this.buf.get(offset); 838 } 839 840 @Override 841 public int read(int offset, byte[] out, int outOffset, int len) { 842 this.buf.get(offset, out, outOffset, len); 843 return len; 844 } 845 846 @Override 847 public int read(int offset, ByteBuffer out) { 848 int len = out.remaining(); 849 this.buf.get(out, offset, len); 850 return len; 851 } 852 853 @Override 854 public int size() { 855 return this.length; 856 } 857 } 858}