001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER; 021 022import io.opentelemetry.api.GlobalOpenTelemetry; 023import io.opentelemetry.api.trace.Span; 024import io.opentelemetry.context.Context; 025import io.opentelemetry.context.Scope; 026import io.opentelemetry.context.propagation.TextMapGetter; 027import java.io.Closeable; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.net.InetAddress; 031import java.net.InetSocketAddress; 032import java.nio.ByteBuffer; 033import java.security.GeneralSecurityException; 034import java.security.cert.X509Certificate; 035import java.util.Collections; 036import java.util.Map; 037import java.util.Objects; 038import java.util.Properties; 039import org.apache.commons.crypto.cipher.CryptoCipherFactory; 040import org.apache.commons.crypto.random.CryptoRandom; 041import org.apache.commons.crypto.random.CryptoRandomFactory; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.ExtendedCellScanner; 044import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint; 045import org.apache.hadoop.hbase.client.VersionInfoUtil; 046import org.apache.hadoop.hbase.codec.Codec; 047import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 048import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 049import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 050import org.apache.hadoop.hbase.nio.ByteBuff; 051import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 052import org.apache.hadoop.hbase.security.AccessDeniedException; 053import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 054import org.apache.hadoop.hbase.security.SaslStatus; 055import org.apache.hadoop.hbase.security.SaslUtil; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider; 058import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider; 059import org.apache.hadoop.hbase.trace.TraceUtil; 060import org.apache.hadoop.hbase.util.ByteBufferUtils; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.Pair; 063import org.apache.hadoop.io.IntWritable; 064import org.apache.hadoop.io.Writable; 065import org.apache.hadoop.io.WritableUtils; 066import org.apache.hadoop.io.compress.CompressionCodec; 067import org.apache.hadoop.security.UserGroupInformation; 068import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 069import org.apache.hadoop.security.authorize.AuthorizationException; 070import org.apache.hadoop.security.authorize.ProxyUsers; 071import org.apache.yetus.audience.InterfaceAudience; 072 073import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 074import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 075import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; 076import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 077import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 078import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 079import org.apache.hbase.thirdparty.com.google.protobuf.Message; 080import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 081import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 082 083import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; 094 095/** Reads calls from a connection and queues them for handling. */ 096@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 097 justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 098@InterfaceAudience.Private 099public abstract class ServerRpcConnection implements Closeable { 100 101 private static final TextMapGetter<RPCTInfo> getter = new RPCTInfoGetter(); 102 103 protected final RpcServer rpcServer; 104 // If the connection header has been read or not. 105 protected boolean connectionHeaderRead = false; 106 107 protected CallCleanup callCleanup; 108 109 // Cache the remote host & port info so that even if the socket is 110 // disconnected, we can say where it used to connect to. 111 protected String hostAddress; 112 protected int remotePort; 113 protected InetAddress addr; 114 protected ConnectionHeader connectionHeader; 115 protected Map<String, byte[]> connectionAttributes; 116 117 /** 118 * Codec the client asked use. 119 */ 120 protected Codec codec; 121 /** 122 * Compression codec the client asked us use. 123 */ 124 protected CompressionCodec compressionCodec; 125 protected BlockingService service; 126 127 protected SaslServerAuthenticationProvider provider; 128 protected boolean skipInitialSaslHandshake; 129 protected boolean useSasl; 130 protected HBaseSaslRpcServer saslServer; 131 132 // was authentication allowed with a fallback to simple auth 133 protected boolean authenticatedWithFallback; 134 135 protected boolean retryImmediatelySupported = false; 136 137 protected User user = null; 138 protected UserGroupInformation ugi = null; 139 protected X509Certificate[] clientCertificateChain = null; 140 141 public ServerRpcConnection(RpcServer rpcServer) { 142 this.rpcServer = rpcServer; 143 this.callCleanup = null; 144 } 145 146 @Override 147 public String toString() { 148 return getHostAddress() + ":" + remotePort; 149 } 150 151 public String getHostAddress() { 152 return hostAddress; 153 } 154 155 public InetAddress getHostInetAddress() { 156 return addr; 157 } 158 159 public int getRemotePort() { 160 return remotePort; 161 } 162 163 public VersionInfo getVersionInfo() { 164 if (connectionHeader != null && connectionHeader.hasVersionInfo()) { 165 return connectionHeader.getVersionInfo(); 166 } 167 return null; 168 } 169 170 private String getFatalConnectionString(final int version, final byte authByte) { 171 return "serverVersion=" + RpcServer.CURRENT_VERSION + ", clientVersion=" + version 172 + ", authMethod=" + authByte + 173 // The provider may be null if we failed to parse the header of the request 174 ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName()) 175 + " from " + toString(); 176 } 177 178 /** 179 * Set up cell block codecs 180 */ 181 private void setupCellBlockCodecs() throws FatalConnectionException { 182 // TODO: Plug in other supported decoders. 183 if (!connectionHeader.hasCellBlockCodecClass()) { 184 return; 185 } 186 String className = connectionHeader.getCellBlockCodecClass(); 187 if (className == null || className.length() == 0) { 188 return; 189 } 190 try { 191 this.codec = (Codec) Class.forName(className).getDeclaredConstructor().newInstance(); 192 } catch (Exception e) { 193 throw new UnsupportedCellCodecException(className, e); 194 } 195 if (!connectionHeader.hasCellBlockCompressorClass()) { 196 return; 197 } 198 className = connectionHeader.getCellBlockCompressorClass(); 199 try { 200 this.compressionCodec = 201 (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance(); 202 } catch (Exception e) { 203 throw new UnsupportedCompressionCodecException(className, e); 204 } 205 } 206 207 /** 208 * Set up cipher for rpc encryption with Apache Commons Crypto. 209 */ 210 private Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> setupCryptoCipher() 211 throws FatalConnectionException { 212 // If simple auth, return 213 if (saslServer == null) { 214 return null; 215 } 216 // check if rpc encryption with Crypto AES 217 String qop = saslServer.getNegotiatedQop(); 218 boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop); 219 boolean isCryptoAesEncryption = isEncryption 220 && this.rpcServer.conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false); 221 if (!isCryptoAesEncryption) { 222 return null; 223 } 224 if (!connectionHeader.hasRpcCryptoCipherTransformation()) { 225 return null; 226 } 227 String transformation = connectionHeader.getRpcCryptoCipherTransformation(); 228 if (transformation == null || transformation.length() == 0) { 229 return null; 230 } 231 // Negotiates AES based on complete saslServer. 232 // The Crypto metadata need to be encrypted and send to client. 233 Properties properties = new Properties(); 234 // the property for SecureRandomFactory 235 properties.setProperty(CryptoRandomFactory.CLASSES_KEY, 236 this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random", 237 "org.apache.commons.crypto.random.JavaCryptoRandom")); 238 // the property for cipher class 239 properties.setProperty(CryptoCipherFactory.CLASSES_KEY, 240 this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class", 241 "org.apache.commons.crypto.cipher.JceCipher")); 242 243 int cipherKeyBits = 244 this.rpcServer.conf.getInt("hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128); 245 // generate key and iv 246 if (cipherKeyBits % 8 != 0) { 247 throw new IllegalArgumentException( 248 "The AES cipher key size in bits" + " should be a multiple of byte"); 249 } 250 int len = cipherKeyBits / 8; 251 byte[] inKey = new byte[len]; 252 byte[] outKey = new byte[len]; 253 byte[] inIv = new byte[len]; 254 byte[] outIv = new byte[len]; 255 256 CryptoAES cryptoAES; 257 try { 258 // generate the cipher meta data with SecureRandom 259 CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); 260 secureRandom.nextBytes(inKey); 261 secureRandom.nextBytes(outKey); 262 secureRandom.nextBytes(inIv); 263 secureRandom.nextBytes(outIv); 264 265 // create CryptoAES for server 266 cryptoAES = new CryptoAES(transformation, properties, inKey, outKey, inIv, outIv); 267 } catch (GeneralSecurityException | IOException ex) { 268 throw new UnsupportedCryptoException(ex.getMessage(), ex); 269 } 270 // create SaslCipherMeta and send to client, 271 // for client, the [inKey, outKey], [inIv, outIv] should be reversed 272 RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder(); 273 ccmBuilder.setTransformation(transformation); 274 ccmBuilder.setInIv(getByteString(outIv)); 275 ccmBuilder.setInKey(getByteString(outKey)); 276 ccmBuilder.setOutIv(getByteString(inIv)); 277 ccmBuilder.setOutKey(getByteString(inKey)); 278 RPCProtos.ConnectionHeaderResponse resp = 279 RPCProtos.ConnectionHeaderResponse.newBuilder().setCryptoCipherMeta(ccmBuilder).build(); 280 return Pair.newPair(resp, cryptoAES); 281 } 282 283 private ByteString getByteString(byte[] bytes) { 284 // return singleton to reduce object allocation 285 return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); 286 } 287 288 private UserGroupInformation createUser(ConnectionHeader head) { 289 UserGroupInformation ugi = null; 290 291 if (!head.hasUserInfo()) { 292 return null; 293 } 294 UserInformation userInfoProto = head.getUserInfo(); 295 String effectiveUser = null; 296 if (userInfoProto.hasEffectiveUser()) { 297 effectiveUser = userInfoProto.getEffectiveUser(); 298 } 299 String realUser = null; 300 if (userInfoProto.hasRealUser()) { 301 realUser = userInfoProto.getRealUser(); 302 } 303 if (effectiveUser != null) { 304 if (realUser != null) { 305 UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser); 306 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi); 307 } else { 308 ugi = UserGroupInformation.createRemoteUser(effectiveUser); 309 } 310 } 311 return ugi; 312 } 313 314 protected final void disposeSasl() { 315 if (saslServer != null) { 316 saslServer.dispose(); 317 saslServer = null; 318 } 319 } 320 321 /** 322 * No protobuf encoding of raw sasl messages 323 */ 324 protected final void doRawSaslReply(SaslStatus status, Writable rv, String errorClass, 325 String error) throws IOException { 326 BufferChain bc; 327 // In my testing, have noticed that sasl messages are usually 328 // in the ballpark of 100-200. That's why the initial capacity is 256. 329 try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256); 330 DataOutputStream out = new DataOutputStream(saslResponse)) { 331 out.writeInt(status.state); // write status 332 if (status == SaslStatus.SUCCESS) { 333 rv.write(out); 334 } else { 335 WritableUtils.writeString(out, errorClass); 336 WritableUtils.writeString(out, error); 337 } 338 bc = new BufferChain(saslResponse.getByteBuffer()); 339 } 340 doRespond(() -> bc); 341 } 342 343 HBaseSaslRpcServer getOrCreateSaslServer() throws IOException { 344 if (saslServer == null) { 345 saslServer = new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager); 346 } 347 return saslServer; 348 } 349 350 void finishSaslNegotiation() throws IOException { 351 String negotiatedQop = saslServer.getNegotiatedQop(); 352 SaslUtil.verifyNegotiatedQop(saslServer.getRequestedQop(), negotiatedQop); 353 ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager); 354 RpcServer.LOG.debug( 355 "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi, 356 negotiatedQop); 357 rpcServer.metrics.authenticationSuccess(); 358 RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); 359 } 360 361 public void processOneRpc(ByteBuff buf) throws IOException, InterruptedException { 362 if (connectionHeaderRead) { 363 processRequest(buf); 364 } else { 365 processConnectionHeader(buf); 366 callCleanupIfNeeded(); 367 this.connectionHeaderRead = true; 368 this.rpcServer.getRpcCoprocessorHost().preAuthorizeConnection(connectionHeader, addr); 369 if (rpcServer.needAuthorization() && !authorizeConnection()) { 370 // Throw FatalConnectionException wrapping ACE so client does right thing and closes 371 // down the connection instead of trying to read non-existent retun. 372 throw new AccessDeniedException("Connection from " + this + " for service " 373 + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); 374 } 375 this.user = this.rpcServer.userProvider.create(this.ugi); 376 this.rpcServer.getRpcCoprocessorHost().postAuthorizeConnection( 377 this.user != null ? this.user.getName() : null, this.clientCertificateChain); 378 } 379 } 380 381 private boolean authorizeConnection() throws IOException { 382 try { 383 // If auth method is DIGEST, the token was obtained by the 384 // real user for the effective user, therefore not required to 385 // authorize real user. doAs is allowed only for simple or kerberos 386 // authentication 387 if (ugi != null && ugi.getRealUser() != null && provider.supportsProtocolAuthentication()) { 388 ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf); 389 } 390 this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress()); 391 this.rpcServer.metrics.authorizationSuccess(); 392 } catch (AuthorizationException ae) { 393 if (RpcServer.LOG.isDebugEnabled()) { 394 RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae); 395 } 396 this.rpcServer.metrics.authorizationFailure(); 397 doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae))); 398 return false; 399 } 400 return true; 401 } 402 403 private CodedInputStream createCis(ByteBuff buf) { 404 // Here we read in the header. We avoid having pb 405 // do its default 4k allocation for CodedInputStream. We force it to use 406 // backing array. 407 CodedInputStream cis; 408 if (buf.hasArray()) { 409 cis = UnsafeByteOperations 410 .unsafeWrap(buf.array(), buf.arrayOffset() + buf.position(), buf.limit()).newCodedInput(); 411 } else { 412 cis = UnsafeByteOperations.unsafeWrap(new ByteBuffByteInput(buf, buf.limit()), 0, buf.limit()) 413 .newCodedInput(); 414 } 415 cis.enableAliasing(true); 416 return cis; 417 } 418 419 // Reads the connection header following version 420 private void processConnectionHeader(ByteBuff buf) throws IOException { 421 this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf)); 422 423 // we want to copy the attributes prior to releasing the buffer so that they don't get corrupted 424 // eventually 425 if (connectionHeader.getAttributeList().isEmpty()) { 426 this.connectionAttributes = Collections.emptyMap(); 427 } else { 428 this.connectionAttributes = 429 Maps.newHashMapWithExpectedSize(connectionHeader.getAttributeList().size()); 430 for (HBaseProtos.NameBytesPair nameBytesPair : connectionHeader.getAttributeList()) { 431 this.connectionAttributes.put(nameBytesPair.getName(), 432 nameBytesPair.getValue().toByteArray()); 433 } 434 } 435 String serviceName = connectionHeader.getServiceName(); 436 if (serviceName == null) { 437 throw new EmptyServiceNameException(); 438 } 439 this.service = RpcServer.getService(this.rpcServer.services, serviceName); 440 if (this.service == null) { 441 throw new UnknownServiceException(serviceName); 442 } 443 setupCellBlockCodecs(); 444 sendConnectionHeaderResponseIfNeeded(); 445 UserGroupInformation protocolUser = createUser(connectionHeader); 446 if (!useSasl) { 447 ugi = protocolUser; 448 if (ugi != null) { 449 ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE); 450 } 451 // audit logging for SASL authenticated users happens in saslReadAndProcess() 452 if (authenticatedWithFallback) { 453 RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", ugi, 454 getHostAddress()); 455 } 456 } else { 457 // user is authenticated 458 ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod()); 459 // Now we check if this is a proxy user case. If the protocol user is 460 // different from the 'user', it is a proxy user scenario. However, 461 // this is not allowed if user authenticated with DIGEST. 462 if ((protocolUser != null) && (!protocolUser.getUserName().equals(ugi.getUserName()))) { 463 if (!provider.supportsProtocolAuthentication()) { 464 // Not allowed to doAs if token authentication is used 465 throw new AccessDeniedException("Authenticated user (" + ugi 466 + ") doesn't match what the client claims to be (" + protocolUser + ")"); 467 } else { 468 // Effective user can be different from authenticated user 469 // for simple auth or kerberos auth 470 // The user is the real user. Now we create a proxy user 471 UserGroupInformation realUser = ugi; 472 ugi = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser); 473 // Now the user is a proxy user, set Authentication method Proxy. 474 ugi.setAuthenticationMethod(AuthenticationMethod.PROXY); 475 } 476 } 477 } 478 String version; 479 if (this.connectionHeader.hasVersionInfo()) { 480 // see if this connection will support RetryImmediatelyException 481 this.retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2); 482 version = this.connectionHeader.getVersionInfo().getVersion(); 483 } else { 484 version = "UNKNOWN"; 485 } 486 RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}", 487 this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName); 488 } 489 490 public ConnectionHeader getConnectionHeader() { 491 return connectionHeader; 492 } 493 494 /** 495 * Send the response for connection header 496 */ 497 private void sendConnectionHeaderResponseIfNeeded() throws FatalConnectionException { 498 Pair<RPCProtos.ConnectionHeaderResponse, CryptoAES> pair = setupCryptoCipher(); 499 // Response the connection header if Crypto AES is enabled 500 if (pair == null) { 501 return; 502 } 503 try { 504 int size = pair.getFirst().getSerializedSize(); 505 BufferChain bc; 506 try (ByteBufferOutputStream bbOut = new ByteBufferOutputStream(4 + size); 507 DataOutputStream out = new DataOutputStream(bbOut)) { 508 out.writeInt(size); 509 pair.getFirst().writeTo(out); 510 bc = new BufferChain(bbOut.getByteBuffer()); 511 } 512 doRespond(new RpcResponse() { 513 514 @Override 515 public BufferChain getResponse() { 516 return bc; 517 } 518 519 @Override 520 public void done() { 521 // must switch after sending the connection header response, as the client still uses the 522 // original SaslClient to unwrap the data we send back 523 saslServer.switchToCryptoAES(pair.getSecond()); 524 } 525 }); 526 } catch (IOException ex) { 527 throw new UnsupportedCryptoException(ex.getMessage(), ex); 528 } 529 } 530 531 protected abstract void doRespond(RpcResponse resp) throws IOException; 532 533 /** 534 * Has the request header and the request param and optionally encoded data buffer all in this one 535 * array. 536 * <p/> 537 * Will be overridden in tests. 538 */ 539 protected void processRequest(ByteBuff buf) throws IOException, InterruptedException { 540 long totalRequestSize = buf.limit(); 541 int offset = 0; 542 // Here we read in the header. We avoid having pb 543 // do its default 4k allocation for CodedInputStream. We force it to use 544 // backing array. 545 CodedInputStream cis = createCis(buf); 546 int headerSize = cis.readRawVarint32(); 547 offset = cis.getTotalBytesRead(); 548 Message.Builder builder = RequestHeader.newBuilder(); 549 ProtobufUtil.mergeFrom(builder, cis, headerSize); 550 RequestHeader header = (RequestHeader) builder.build(); 551 offset += headerSize; 552 Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() 553 .extract(Context.current(), header.getTraceInfo(), getter); 554 555 // n.b. Management of this Span instance is a little odd. Most exit paths from this try scope 556 // are early-exits due to error cases. There's only one success path, the asynchronous call to 557 // RpcScheduler#dispatch. The success path assumes ownership of the span, which is represented 558 // by null-ing out the reference in this scope. All other paths end the span. Thus, and in 559 // order to avoid accidentally orphaning the span, the call to Span#end happens in a finally 560 // block iff the span is non-null. 561 Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx); 562 try (Scope ignored = span.makeCurrent()) { 563 int id = header.getCallId(); 564 // HBASE-28128 - if server is aborting, don't bother trying to process. It will 565 // fail at the handler layer, but worse might result in CallQueueTooBigException if the 566 // queue is full but server is not properly processing requests. Better to throw an aborted 567 // exception here so that the client can properly react. 568 if (rpcServer.server != null && rpcServer.server.isAborted()) { 569 RegionServerAbortedException serverIsAborted = new RegionServerAbortedException( 570 "Server " + rpcServer.server.getServerName() + " aborting"); 571 this.rpcServer.metrics.exception(serverIsAborted); 572 sendErrorResponseForCall(id, totalRequestSize, span, serverIsAborted.getMessage(), 573 serverIsAborted); 574 return; 575 } 576 577 if (RpcServer.LOG.isTraceEnabled()) { 578 RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) 579 + " totalRequestSize: " + totalRequestSize + " bytes"); 580 } 581 // Enforcing the call queue size, this triggers a retry in the client 582 // This is a bit late to be doing this check - we have already read in the 583 // total request. 584 if ( 585 (totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) 586 > this.rpcServer.maxQueueSizeInBytes 587 ) { 588 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 589 sendErrorResponseForCall(id, totalRequestSize, span, 590 "Call queue is full on " + this.rpcServer.server.getServerName() 591 + ", is hbase.ipc.server.max.callqueue.size too small?", 592 RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 593 return; 594 } 595 MethodDescriptor md = null; 596 Message param = null; 597 ExtendedCellScanner cellScanner = null; 598 try { 599 if (header.hasRequestParam() && header.getRequestParam()) { 600 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); 601 if (md == null) { 602 throw new UnsupportedOperationException(header.getMethodName()); 603 } 604 builder = this.service.getRequestPrototype(md).newBuilderForType(); 605 cis.resetSizeCounter(); 606 int paramSize = cis.readRawVarint32(); 607 offset += cis.getTotalBytesRead(); 608 if (builder != null) { 609 ProtobufUtil.mergeFrom(builder, cis, paramSize); 610 param = builder.build(); 611 } 612 offset += paramSize; 613 } else { 614 // currently header must have request param, so we directly throw 615 // exception here 616 String msg = "Invalid request header: " + TextFormat.shortDebugString(header) 617 + ", should have param set in it"; 618 RpcServer.LOG.warn(msg); 619 throw new DoNotRetryIOException(msg); 620 } 621 if (header.hasCellBlockMeta()) { 622 buf.position(offset); 623 ByteBuff dup = buf.duplicate(); 624 dup.limit(offset + header.getCellBlockMeta().getLength()); 625 cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec, 626 this.compressionCodec, dup); 627 } 628 } catch (Throwable thrown) { 629 InetSocketAddress address = this.rpcServer.getListenerAddress(); 630 String msg = (address != null ? address : "(channel closed)") 631 + " is unable to read call parameter from client " + getHostAddress(); 632 RpcServer.LOG.warn(msg, thrown); 633 634 this.rpcServer.metrics.exception(thrown); 635 636 final Throwable responseThrowable; 637 if (thrown instanceof LinkageError) { 638 // probably the hbase hadoop version does not match the running hadoop version 639 responseThrowable = new DoNotRetryIOException(thrown); 640 } else if (thrown instanceof UnsupportedOperationException) { 641 // If the method is not present on the server, do not retry. 642 responseThrowable = new DoNotRetryIOException(thrown); 643 } else { 644 responseThrowable = thrown; 645 } 646 647 sendErrorResponseForCall(id, totalRequestSize, span, 648 msg + "; " + responseThrowable.getMessage(), responseThrowable); 649 return; 650 } 651 652 int timeout = 0; 653 if (header.hasTimeout() && header.getTimeout() > 0) { 654 timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); 655 } 656 ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, 657 totalRequestSize, this.addr, timeout, this.callCleanup); 658 659 if (this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { 660 // unset span do that it's not closed in the finally block 661 span = null; 662 } else { 663 this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); 664 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 665 call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, 666 "Call queue is full on " + this.rpcServer.server.getServerName() 667 + ", too many items queued ?"); 668 TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 669 call.sendResponseIfReady(); 670 } 671 } finally { 672 if (span != null) { 673 span.end(); 674 } 675 } 676 } 677 678 private void sendErrorResponseForCall(int id, long totalRequestSize, Span span, String msg, 679 Throwable responseThrowable) throws IOException { 680 ServerCall<?> failedcall = createCall(id, this.service, null, null, null, null, 681 totalRequestSize, null, 0, this.callCleanup); 682 failedcall.setResponse(null, null, responseThrowable, msg); 683 TraceUtil.setError(span, responseThrowable); 684 failedcall.sendResponseIfReady(); 685 } 686 687 protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException { 688 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1); 689 ServerCall.setExceptionResponse(e, msg, headerBuilder); 690 ByteBuffer headerBuf = 691 ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null); 692 BufferChain buf = new BufferChain(headerBuf); 693 return () -> buf; 694 } 695 696 private void doBadPreambleHandling(String msg) throws IOException { 697 doBadPreambleHandling(msg, new FatalConnectionException(msg)); 698 } 699 700 private void doBadPreambleHandling(String msg, Exception e) throws IOException { 701 RpcServer.LOG.warn(msg, e); 702 doRespond(getErrorResponse(msg, e)); 703 } 704 705 private void doPreambleResponse(Message resp) throws IOException { 706 ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build(); 707 ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, null); 708 BufferChain bufChain = new BufferChain(buf); 709 doRespond(() -> bufChain); 710 } 711 712 private boolean doConnectionRegistryResponse() throws IOException { 713 if (!(rpcServer.server instanceof ConnectionRegistryEndpoint)) { 714 // should be in tests or some scenarios where we should not reach here 715 return false; 716 } 717 // on backup masters, this request may be blocked since we need to fetch it from filesystem, 718 // but since it is just backup master, it is not a critical problem 719 String clusterId = ((ConnectionRegistryEndpoint) rpcServer.server).getClusterId(); 720 RpcServer.LOG.debug("Response connection registry, clusterId = '{}'", clusterId); 721 if (clusterId == null) { 722 // should be in tests or some scenarios where we should not reach here 723 return false; 724 } 725 GetConnectionRegistryResponse resp = 726 GetConnectionRegistryResponse.newBuilder().setClusterId(clusterId).build(); 727 doPreambleResponse(resp); 728 return true; 729 } 730 731 private void doSecurityPreambleResponse() throws IOException { 732 if (rpcServer.isSecurityEnabled) { 733 SecurityPreamableResponse resp = SecurityPreamableResponse.newBuilder() 734 .setServerPrincipal(rpcServer.serverPrincipal).build(); 735 doPreambleResponse(resp); 736 } else { 737 // security is not enabled, do not need a principal when connecting, throw a special exception 738 // to let client know it should just use simple authentication 739 doRespond(getErrorResponse("security is not enabled", new SecurityNotEnabledException())); 740 } 741 } 742 743 protected final void callCleanupIfNeeded() { 744 if (callCleanup != null) { 745 callCleanup.run(); 746 callCleanup = null; 747 } 748 } 749 750 protected enum PreambleResponse { 751 SUCCEED, // successfully processed the rpc preamble header 752 CONTINUE, // the preamble header is for other purpose, wait for the rpc preamble header 753 CLOSE // close the rpc connection 754 } 755 756 protected final PreambleResponse processPreamble(ByteBuffer preambleBuffer) throws IOException { 757 assert preambleBuffer.remaining() == 6; 758 if ( 759 ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, 760 RpcClient.REGISTRY_PREAMBLE_HEADER, 0, 6) && doConnectionRegistryResponse() 761 ) { 762 return PreambleResponse.CLOSE; 763 } 764 if ( 765 ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, 766 RpcClient.SECURITY_PREAMBLE_HEADER, 0, 6) 767 ) { 768 doSecurityPreambleResponse(); 769 return PreambleResponse.CONTINUE; 770 } 771 if (!ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 4, RPC_HEADER, 0, 4)) { 772 doBadPreambleHandling( 773 "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" 774 + Bytes.toStringBinary( 775 ByteBufferUtils.toBytes(preambleBuffer, preambleBuffer.position(), RPC_HEADER.length), 776 0, RPC_HEADER.length) 777 + " from " + toString()); 778 return PreambleResponse.CLOSE; 779 } 780 int version = preambleBuffer.get(preambleBuffer.position() + 4) & 0xFF; 781 byte authByte = preambleBuffer.get(preambleBuffer.position() + 5); 782 if (version != RpcServer.CURRENT_VERSION) { 783 String msg = getFatalConnectionString(version, authByte); 784 doBadPreambleHandling(msg, new WrongVersionException(msg)); 785 return PreambleResponse.CLOSE; 786 } 787 788 this.provider = rpcServer.saslProviders.selectProvider(authByte); 789 if (this.provider == null) { 790 String msg = getFatalConnectionString(version, authByte); 791 doBadPreambleHandling(msg, new BadAuthException(msg)); 792 return PreambleResponse.CLOSE; 793 } 794 // TODO this is a wart while simple auth'n doesn't go through sasl. 795 if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) { 796 if (this.rpcServer.allowFallbackToSimpleAuth) { 797 this.rpcServer.metrics.authenticationFallback(); 798 authenticatedWithFallback = true; 799 } else { 800 AccessDeniedException ae = new AccessDeniedException("Authentication is required"); 801 doRespond(getErrorResponse(ae.getMessage(), ae)); 802 return PreambleResponse.CLOSE; 803 } 804 } 805 if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) { 806 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, 807 null); 808 provider = rpcServer.saslProviders.getSimpleProvider(); 809 // client has already sent the initial Sasl message and we 810 // should ignore it. Both client and server should fall back 811 // to simple auth from now on. 812 skipInitialSaslHandshake = true; 813 } 814 useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider); 815 return PreambleResponse.SUCCEED; 816 } 817 818 boolean isSimpleAuthentication() { 819 return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider; 820 } 821 822 public abstract boolean isConnectionOpen(); 823 824 public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md, 825 RequestHeader header, Message param, ExtendedCellScanner cellScanner, long size, 826 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup); 827 828 private static class ByteBuffByteInput extends ByteInput { 829 830 private ByteBuff buf; 831 private int length; 832 833 ByteBuffByteInput(ByteBuff buf, int length) { 834 this.buf = buf; 835 this.length = length; 836 } 837 838 @Override 839 public byte read(int offset) { 840 return this.buf.get(offset); 841 } 842 843 @Override 844 public int read(int offset, byte[] out, int outOffset, int len) { 845 this.buf.get(offset, out, outOffset, len); 846 return len; 847 } 848 849 @Override 850 public int read(int offset, ByteBuffer out) { 851 int len = out.remaining(); 852 this.buf.get(out, offset, len); 853 return len; 854 } 855 856 @Override 857 public long readLong(int offset) { 858 return this.buf.getLong(offset); 859 } 860 861 @Override 862 public int size() { 863 return this.length; 864 } 865 866 } 867}