001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER; 021 022import io.opentelemetry.api.GlobalOpenTelemetry; 023import io.opentelemetry.api.trace.Span; 024import io.opentelemetry.context.Context; 025import io.opentelemetry.context.Scope; 026import io.opentelemetry.context.propagation.TextMapGetter; 027import java.io.ByteArrayInputStream; 028import java.io.Closeable; 029import java.io.DataOutputStream; 030import java.io.IOException; 031import java.net.InetAddress; 032import java.net.InetSocketAddress; 033import java.nio.ByteBuffer; 034import java.nio.channels.Channels; 035import java.nio.channels.ReadableByteChannel; 036import java.security.GeneralSecurityException; 037import java.util.Objects; 038import java.util.Properties; 039import org.apache.commons.crypto.cipher.CryptoCipherFactory; 040import org.apache.commons.crypto.random.CryptoRandom; 041import org.apache.commons.crypto.random.CryptoRandomFactory; 042import org.apache.hadoop.hbase.CellScanner; 043import org.apache.hadoop.hbase.DoNotRetryIOException; 044import org.apache.hadoop.hbase.client.VersionInfoUtil; 045import org.apache.hadoop.hbase.codec.Codec; 046import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 047import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 048import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 049import org.apache.hadoop.hbase.nio.ByteBuff; 050import org.apache.hadoop.hbase.nio.SingleByteBuff; 051import org.apache.hadoop.hbase.security.AccessDeniedException; 052import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 053import org.apache.hadoop.hbase.security.SaslStatus; 054import org.apache.hadoop.hbase.security.SaslUtil; 055import org.apache.hadoop.hbase.security.User; 056import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider; 057import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders; 058import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider; 059import org.apache.hadoop.hbase.trace.TraceUtil; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.io.BytesWritable; 062import org.apache.hadoop.io.IntWritable; 063import org.apache.hadoop.io.Writable; 064import org.apache.hadoop.io.WritableUtils; 065import org.apache.hadoop.io.compress.CompressionCodec; 066import org.apache.hadoop.security.UserGroupInformation; 067import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 068import org.apache.hadoop.security.authorize.AuthorizationException; 069import org.apache.hadoop.security.authorize.ProxyUsers; 070import org.apache.hadoop.security.token.SecretManager.InvalidToken; 071import org.apache.yetus.audience.InterfaceAudience; 072 073import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 074import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; 075import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 076import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 077import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 078import org.apache.hbase.thirdparty.com.google.protobuf.Message; 079import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 080import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 081 082import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; 090 091/** Reads calls from a connection and queues them for handling. */ 092@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 093 justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 094@InterfaceAudience.Private 095abstract class ServerRpcConnection implements Closeable { 096 097 private static final TextMapGetter<RPCTInfo> getter = new RPCTInfoGetter(); 098 099 protected final RpcServer rpcServer; 100 // If the connection header has been read or not. 101 protected boolean connectionHeaderRead = false; 102 103 protected CallCleanup callCleanup; 104 105 // Cache the remote host & port info so that even if the socket is 106 // disconnected, we can say where it used to connect to. 107 protected String hostAddress; 108 protected int remotePort; 109 protected InetAddress addr; 110 protected ConnectionHeader connectionHeader; 111 112 /** 113 * Codec the client asked use. 114 */ 115 protected Codec codec; 116 /** 117 * Compression codec the client asked us use. 118 */ 119 protected CompressionCodec compressionCodec; 120 protected BlockingService service; 121 122 protected SaslServerAuthenticationProvider provider; 123 protected boolean saslContextEstablished; 124 protected boolean skipInitialSaslHandshake; 125 private ByteBuffer unwrappedData; 126 // When is this set? FindBugs wants to know! Says NP 127 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); 128 protected boolean useSasl; 129 protected HBaseSaslRpcServer saslServer; 130 protected CryptoAES cryptoAES; 131 protected boolean useWrap = false; 132 protected boolean useCryptoAesWrap = false; 133 134 // was authentication allowed with a fallback to simple auth 135 protected boolean authenticatedWithFallback; 136 137 protected boolean retryImmediatelySupported = false; 138 139 protected User user = null; 140 protected UserGroupInformation ugi = null; 141 protected SaslServerAuthenticationProviders saslProviders = null; 142 143 public ServerRpcConnection(RpcServer rpcServer) { 144 this.rpcServer = rpcServer; 145 this.callCleanup = null; 146 this.saslProviders = SaslServerAuthenticationProviders.getInstance(rpcServer.getConf()); 147 } 148 149 @Override 150 public String toString() { 151 return getHostAddress() + ":" + remotePort; 152 } 153 154 public String getHostAddress() { 155 return hostAddress; 156 } 157 158 public InetAddress getHostInetAddress() { 159 return addr; 160 } 161 162 public int getRemotePort() { 163 return remotePort; 164 } 165 166 public VersionInfo getVersionInfo() { 167 if (connectionHeader.hasVersionInfo()) { 168 return connectionHeader.getVersionInfo(); 169 } 170 return null; 171 } 172 173 private String getFatalConnectionString(final int version, final byte authByte) { 174 return "serverVersion=" + RpcServer.CURRENT_VERSION + ", clientVersion=" + version 175 + ", authMethod=" + authByte + 176 // The provider may be null if we failed to parse the header of the request 177 ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName()) 178 + " from " + toString(); 179 } 180 181 /** 182 * Set up cell block codecs n 183 */ 184 private void setupCellBlockCodecs(final ConnectionHeader header) throws FatalConnectionException { 185 // TODO: Plug in other supported decoders. 186 if (!header.hasCellBlockCodecClass()) return; 187 String className = header.getCellBlockCodecClass(); 188 if (className == null || className.length() == 0) return; 189 try { 190 this.codec = (Codec) Class.forName(className).getDeclaredConstructor().newInstance(); 191 } catch (Exception e) { 192 throw new UnsupportedCellCodecException(className, e); 193 } 194 if (!header.hasCellBlockCompressorClass()) return; 195 className = header.getCellBlockCompressorClass(); 196 try { 197 this.compressionCodec = 198 (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance(); 199 } catch (Exception e) { 200 throw new UnsupportedCompressionCodecException(className, e); 201 } 202 } 203 204 /** 205 * Set up cipher for rpc encryption with Apache Commons Crypto n 206 */ 207 private void setupCryptoCipher(final ConnectionHeader header, 208 RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) throws FatalConnectionException { 209 // If simple auth, return 210 if (saslServer == null) return; 211 // check if rpc encryption with Crypto AES 212 String qop = saslServer.getNegotiatedQop(); 213 boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop().equalsIgnoreCase(qop); 214 boolean isCryptoAesEncryption = isEncryption 215 && this.rpcServer.conf.getBoolean("hbase.rpc.crypto.encryption.aes.enabled", false); 216 if (!isCryptoAesEncryption) return; 217 if (!header.hasRpcCryptoCipherTransformation()) return; 218 String transformation = header.getRpcCryptoCipherTransformation(); 219 if (transformation == null || transformation.length() == 0) return; 220 // Negotiates AES based on complete saslServer. 221 // The Crypto metadata need to be encrypted and send to client. 222 Properties properties = new Properties(); 223 // the property for SecureRandomFactory 224 properties.setProperty(CryptoRandomFactory.CLASSES_KEY, 225 this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random", 226 "org.apache.commons.crypto.random.JavaCryptoRandom")); 227 // the property for cipher class 228 properties.setProperty(CryptoCipherFactory.CLASSES_KEY, 229 this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class", 230 "org.apache.commons.crypto.cipher.JceCipher")); 231 232 int cipherKeyBits = 233 this.rpcServer.conf.getInt("hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128); 234 // generate key and iv 235 if (cipherKeyBits % 8 != 0) { 236 throw new IllegalArgumentException( 237 "The AES cipher key size in bits" + " should be a multiple of byte"); 238 } 239 int len = cipherKeyBits / 8; 240 byte[] inKey = new byte[len]; 241 byte[] outKey = new byte[len]; 242 byte[] inIv = new byte[len]; 243 byte[] outIv = new byte[len]; 244 245 try { 246 // generate the cipher meta data with SecureRandom 247 CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); 248 secureRandom.nextBytes(inKey); 249 secureRandom.nextBytes(outKey); 250 secureRandom.nextBytes(inIv); 251 secureRandom.nextBytes(outIv); 252 253 // create CryptoAES for server 254 cryptoAES = new CryptoAES(transformation, properties, inKey, outKey, inIv, outIv); 255 // create SaslCipherMeta and send to client, 256 // for client, the [inKey, outKey], [inIv, outIv] should be reversed 257 RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder(); 258 ccmBuilder.setTransformation(transformation); 259 ccmBuilder.setInIv(getByteString(outIv)); 260 ccmBuilder.setInKey(getByteString(outKey)); 261 ccmBuilder.setOutIv(getByteString(inIv)); 262 ccmBuilder.setOutKey(getByteString(inKey)); 263 chrBuilder.setCryptoCipherMeta(ccmBuilder); 264 useCryptoAesWrap = true; 265 } catch (GeneralSecurityException | IOException ex) { 266 throw new UnsupportedCryptoException(ex.getMessage(), ex); 267 } 268 } 269 270 private ByteString getByteString(byte[] bytes) { 271 // return singleton to reduce object allocation 272 return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); 273 } 274 275 private UserGroupInformation createUser(ConnectionHeader head) { 276 UserGroupInformation ugi = null; 277 278 if (!head.hasUserInfo()) { 279 return null; 280 } 281 UserInformation userInfoProto = head.getUserInfo(); 282 String effectiveUser = null; 283 if (userInfoProto.hasEffectiveUser()) { 284 effectiveUser = userInfoProto.getEffectiveUser(); 285 } 286 String realUser = null; 287 if (userInfoProto.hasRealUser()) { 288 realUser = userInfoProto.getRealUser(); 289 } 290 if (effectiveUser != null) { 291 if (realUser != null) { 292 UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser); 293 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi); 294 } else { 295 ugi = UserGroupInformation.createRemoteUser(effectiveUser); 296 } 297 } 298 return ugi; 299 } 300 301 protected final void disposeSasl() { 302 if (saslServer != null) { 303 saslServer.dispose(); 304 saslServer = null; 305 } 306 } 307 308 /** 309 * No protobuf encoding of raw sasl messages 310 */ 311 protected final void doRawSaslReply(SaslStatus status, Writable rv, String errorClass, 312 String error) throws IOException { 313 BufferChain bc; 314 // In my testing, have noticed that sasl messages are usually 315 // in the ballpark of 100-200. That's why the initial capacity is 256. 316 try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256); 317 DataOutputStream out = new DataOutputStream(saslResponse)) { 318 out.writeInt(status.state); // write status 319 if (status == SaslStatus.SUCCESS) { 320 rv.write(out); 321 } else { 322 WritableUtils.writeString(out, errorClass); 323 WritableUtils.writeString(out, error); 324 } 325 bc = new BufferChain(saslResponse.getByteBuffer()); 326 } 327 doRespond(() -> bc); 328 } 329 330 public void saslReadAndProcess(ByteBuff saslToken) throws IOException, InterruptedException { 331 if (saslContextEstablished) { 332 RpcServer.LOG.trace("Read input token of size={} for processing by saslServer.unwrap()", 333 saslToken.limit()); 334 if (!useWrap) { 335 processOneRpc(saslToken); 336 } else { 337 byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes(); 338 byte[] plaintextData; 339 if (useCryptoAesWrap) { 340 // unwrap with CryptoAES 341 plaintextData = cryptoAES.unwrap(b, 0, b.length); 342 } else { 343 plaintextData = saslServer.unwrap(b, 0, b.length); 344 } 345 // release the request buffer as we have already unwrapped all its content 346 callCleanupIfNeeded(); 347 processUnwrappedData(plaintextData); 348 } 349 } else { 350 byte[] replyToken; 351 try { 352 if (saslServer == null) { 353 try { 354 saslServer = 355 new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager); 356 } catch (Exception e) { 357 RpcServer.LOG.error("Error when trying to create instance of HBaseSaslRpcServer " 358 + "with sasl provider: " + provider, e); 359 throw e; 360 } 361 RpcServer.LOG.debug("Created SASL server with mechanism={}", 362 provider.getSaslAuthMethod().getAuthMethod()); 363 } 364 RpcServer.LOG.debug( 365 "Read input token of size={} for processing by saslServer." + "evaluateResponse()", 366 saslToken.limit()); 367 replyToken = saslServer 368 .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes()); 369 } catch (IOException e) { 370 RpcServer.LOG.debug("Failed to execute SASL handshake", e); 371 IOException sendToClient = e; 372 Throwable cause = e; 373 while (cause != null) { 374 if (cause instanceof InvalidToken) { 375 sendToClient = (InvalidToken) cause; 376 break; 377 } 378 cause = cause.getCause(); 379 } 380 doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(), 381 sendToClient.getLocalizedMessage()); 382 this.rpcServer.metrics.authenticationFailure(); 383 String clientIP = this.toString(); 384 // attempting user could be null 385 RpcServer.AUDITLOG.warn("{} {}: {}", RpcServer.AUTH_FAILED_FOR, clientIP, 386 saslServer.getAttemptingUser()); 387 throw e; 388 } finally { 389 // release the request buffer as we have already unwrapped all its content 390 callCleanupIfNeeded(); 391 } 392 if (replyToken != null) { 393 if (RpcServer.LOG.isDebugEnabled()) { 394 RpcServer.LOG.debug("Will send token of size " + replyToken.length + " from saslServer."); 395 } 396 doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null, null); 397 } 398 if (saslServer.isComplete()) { 399 String qop = saslServer.getNegotiatedQop(); 400 useWrap = qop != null && !"auth".equalsIgnoreCase(qop); 401 ugi = 402 provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager); 403 RpcServer.LOG.debug( 404 "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi, 405 qop); 406 this.rpcServer.metrics.authenticationSuccess(); 407 RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); 408 saslContextEstablished = true; 409 } 410 } 411 } 412 413 private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException { 414 ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf)); 415 // Read all RPCs contained in the inBuf, even partial ones 416 while (true) { 417 int count; 418 if (unwrappedDataLengthBuffer.remaining() > 0) { 419 count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer); 420 if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) { 421 return; 422 } 423 } 424 425 if (unwrappedData == null) { 426 unwrappedDataLengthBuffer.flip(); 427 int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); 428 429 if (unwrappedDataLength == RpcClient.PING_CALL_ID) { 430 if (RpcServer.LOG.isDebugEnabled()) RpcServer.LOG.debug("Received ping message"); 431 unwrappedDataLengthBuffer.clear(); 432 continue; // ping message 433 } 434 unwrappedData = ByteBuffer.allocate(unwrappedDataLength); 435 } 436 437 count = this.rpcServer.channelRead(ch, unwrappedData); 438 if (count <= 0 || unwrappedData.remaining() > 0) { 439 return; 440 } 441 442 if (unwrappedData.remaining() == 0) { 443 unwrappedDataLengthBuffer.clear(); 444 unwrappedData.flip(); 445 processOneRpc(new SingleByteBuff(unwrappedData)); 446 unwrappedData = null; 447 } 448 } 449 } 450 451 public void processOneRpc(ByteBuff buf) throws IOException, InterruptedException { 452 if (connectionHeaderRead) { 453 processRequest(buf); 454 } else { 455 processConnectionHeader(buf); 456 this.connectionHeaderRead = true; 457 if (rpcServer.needAuthorization() && !authorizeConnection()) { 458 // Throw FatalConnectionException wrapping ACE so client does right thing and closes 459 // down the connection instead of trying to read non-existent retun. 460 throw new AccessDeniedException("Connection from " + this + " for service " 461 + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); 462 } 463 this.user = this.rpcServer.userProvider.create(this.ugi); 464 } 465 } 466 467 private boolean authorizeConnection() throws IOException { 468 try { 469 // If auth method is DIGEST, the token was obtained by the 470 // real user for the effective user, therefore not required to 471 // authorize real user. doAs is allowed only for simple or kerberos 472 // authentication 473 if (ugi != null && ugi.getRealUser() != null && provider.supportsProtocolAuthentication()) { 474 ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf); 475 } 476 this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress()); 477 this.rpcServer.metrics.authorizationSuccess(); 478 } catch (AuthorizationException ae) { 479 if (RpcServer.LOG.isDebugEnabled()) { 480 RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae); 481 } 482 this.rpcServer.metrics.authorizationFailure(); 483 doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae))); 484 return false; 485 } 486 return true; 487 } 488 489 // Reads the connection header following version 490 private void processConnectionHeader(ByteBuff buf) throws IOException { 491 if (buf.hasArray()) { 492 this.connectionHeader = ConnectionHeader.parseFrom(buf.array()); 493 } else { 494 CodedInputStream cis = UnsafeByteOperations 495 .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput(); 496 cis.enableAliasing(true); 497 this.connectionHeader = ConnectionHeader.parseFrom(cis); 498 } 499 String serviceName = connectionHeader.getServiceName(); 500 if (serviceName == null) throw new EmptyServiceNameException(); 501 this.service = RpcServer.getService(this.rpcServer.services, serviceName); 502 if (this.service == null) throw new UnknownServiceException(serviceName); 503 setupCellBlockCodecs(this.connectionHeader); 504 RPCProtos.ConnectionHeaderResponse.Builder chrBuilder = 505 RPCProtos.ConnectionHeaderResponse.newBuilder(); 506 setupCryptoCipher(this.connectionHeader, chrBuilder); 507 responseConnectionHeader(chrBuilder); 508 UserGroupInformation protocolUser = createUser(connectionHeader); 509 if (!useSasl) { 510 ugi = protocolUser; 511 if (ugi != null) { 512 ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE); 513 } 514 // audit logging for SASL authenticated users happens in saslReadAndProcess() 515 if (authenticatedWithFallback) { 516 RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", ugi, 517 getHostAddress()); 518 } 519 } else { 520 // user is authenticated 521 ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod()); 522 // Now we check if this is a proxy user case. If the protocol user is 523 // different from the 'user', it is a proxy user scenario. However, 524 // this is not allowed if user authenticated with DIGEST. 525 if ((protocolUser != null) && (!protocolUser.getUserName().equals(ugi.getUserName()))) { 526 if (!provider.supportsProtocolAuthentication()) { 527 // Not allowed to doAs if token authentication is used 528 throw new AccessDeniedException("Authenticated user (" + ugi 529 + ") doesn't match what the client claims to be (" + protocolUser + ")"); 530 } else { 531 // Effective user can be different from authenticated user 532 // for simple auth or kerberos auth 533 // The user is the real user. Now we create a proxy user 534 UserGroupInformation realUser = ugi; 535 ugi = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser); 536 // Now the user is a proxy user, set Authentication method Proxy. 537 ugi.setAuthenticationMethod(AuthenticationMethod.PROXY); 538 } 539 } 540 } 541 String version; 542 if (this.connectionHeader.hasVersionInfo()) { 543 // see if this connection will support RetryImmediatelyException 544 this.retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2); 545 version = this.connectionHeader.getVersionInfo().getVersion(); 546 } else { 547 version = "UNKNOWN"; 548 } 549 RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}", 550 this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName); 551 } 552 553 /** 554 * Send the response for connection header 555 */ 556 private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) 557 throws FatalConnectionException { 558 // Response the connection header if Crypto AES is enabled 559 if (!chrBuilder.hasCryptoCipherMeta()) return; 560 try { 561 byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray(); 562 // encrypt the Crypto AES cipher meta data with sasl server, and send to client 563 byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4]; 564 Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4); 565 Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length); 566 byte[] wrapped = saslServer.wrap(unwrapped, 0, unwrapped.length); 567 BufferChain bc; 568 try (ByteBufferOutputStream response = new ByteBufferOutputStream(wrapped.length + 4); 569 DataOutputStream out = new DataOutputStream(response)) { 570 out.writeInt(wrapped.length); 571 out.write(wrapped); 572 bc = new BufferChain(response.getByteBuffer()); 573 } 574 doRespond(() -> bc); 575 } catch (IOException ex) { 576 throw new UnsupportedCryptoException(ex.getMessage(), ex); 577 } 578 } 579 580 protected abstract void doRespond(RpcResponse resp) throws IOException; 581 582 /** 583 * n * Has the request header and the request param and optionally encoded data buffer all in this 584 * one array. nn 585 */ 586 protected void processRequest(ByteBuff buf) throws IOException, InterruptedException { 587 long totalRequestSize = buf.limit(); 588 int offset = 0; 589 // Here we read in the header. We avoid having pb 590 // do its default 4k allocation for CodedInputStream. We force it to use 591 // backing array. 592 CodedInputStream cis; 593 if (buf.hasArray()) { 594 cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput(); 595 } else { 596 cis = UnsafeByteOperations 597 .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput(); 598 } 599 cis.enableAliasing(true); 600 int headerSize = cis.readRawVarint32(); 601 offset = cis.getTotalBytesRead(); 602 Message.Builder builder = RequestHeader.newBuilder(); 603 ProtobufUtil.mergeFrom(builder, cis, headerSize); 604 RequestHeader header = (RequestHeader) builder.build(); 605 offset += headerSize; 606 Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() 607 .extract(Context.current(), header.getTraceInfo(), getter); 608 609 // n.b. Management of this Span instance is a little odd. Most exit paths from this try scope 610 // are early-exits due to error cases. There's only one success path, the asynchronous call to 611 // RpcScheduler#dispatch. The success path assumes ownership of the span, which is represented 612 // by null-ing out the reference in this scope. All other paths end the span. Thus, and in 613 // order to avoid accidentally orphaning the span, the call to Span#end happens in a finally 614 // block iff the span is non-null. 615 Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx); 616 try (Scope ignored = span.makeCurrent()) { 617 int id = header.getCallId(); 618 if (RpcServer.LOG.isTraceEnabled()) { 619 RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) 620 + " totalRequestSize: " + totalRequestSize + " bytes"); 621 } 622 // Enforcing the call queue size, this triggers a retry in the client 623 // This is a bit late to be doing this check - we have already read in the 624 // total request. 625 if ( 626 (totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) 627 > this.rpcServer.maxQueueSizeInBytes 628 ) { 629 final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null, 630 totalRequestSize, null, 0, this.callCleanup); 631 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 632 callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, 633 "Call queue is full on " + this.rpcServer.server.getServerName() 634 + ", is hbase.ipc.server.max.callqueue.size too small?"); 635 TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 636 callTooBig.sendResponseIfReady(); 637 return; 638 } 639 MethodDescriptor md = null; 640 Message param = null; 641 CellScanner cellScanner = null; 642 try { 643 if (header.hasRequestParam() && header.getRequestParam()) { 644 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); 645 if (md == null) { 646 throw new UnsupportedOperationException(header.getMethodName()); 647 } 648 builder = this.service.getRequestPrototype(md).newBuilderForType(); 649 cis.resetSizeCounter(); 650 int paramSize = cis.readRawVarint32(); 651 offset += cis.getTotalBytesRead(); 652 if (builder != null) { 653 ProtobufUtil.mergeFrom(builder, cis, paramSize); 654 param = builder.build(); 655 } 656 offset += paramSize; 657 } else { 658 // currently header must have request param, so we directly throw 659 // exception here 660 String msg = "Invalid request header: " + TextFormat.shortDebugString(header) 661 + ", should have param set in it"; 662 RpcServer.LOG.warn(msg); 663 throw new DoNotRetryIOException(msg); 664 } 665 if (header.hasCellBlockMeta()) { 666 buf.position(offset); 667 ByteBuff dup = buf.duplicate(); 668 dup.limit(offset + header.getCellBlockMeta().getLength()); 669 cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec, 670 this.compressionCodec, dup); 671 } 672 } catch (Throwable thrown) { 673 InetSocketAddress address = this.rpcServer.getListenerAddress(); 674 String msg = (address != null ? address : "(channel closed)") 675 + " is unable to read call parameter from client " + getHostAddress(); 676 RpcServer.LOG.warn(msg, thrown); 677 678 this.rpcServer.metrics.exception(thrown); 679 680 final Throwable responseThrowable; 681 if (thrown instanceof LinkageError) { 682 // probably the hbase hadoop version does not match the running hadoop version 683 responseThrowable = new DoNotRetryIOException(thrown); 684 } else if (thrown instanceof UnsupportedOperationException) { 685 // If the method is not present on the server, do not retry. 686 responseThrowable = new DoNotRetryIOException(thrown); 687 } else { 688 responseThrowable = thrown; 689 } 690 691 ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null, 692 totalRequestSize, null, 0, this.callCleanup); 693 readParamsFailedCall.setResponse(null, null, responseThrowable, 694 msg + "; " + responseThrowable.getMessage()); 695 TraceUtil.setError(span, responseThrowable); 696 readParamsFailedCall.sendResponseIfReady(); 697 return; 698 } 699 700 int timeout = 0; 701 if (header.hasTimeout() && header.getTimeout() > 0) { 702 timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); 703 } 704 ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, 705 totalRequestSize, this.addr, timeout, this.callCleanup); 706 707 if (this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { 708 // unset span do that it's not closed in the finally block 709 span = null; 710 } else { 711 this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); 712 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 713 call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, 714 "Call queue is full on " + this.rpcServer.server.getServerName() 715 + ", too many items queued ?"); 716 TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 717 call.sendResponseIfReady(); 718 } 719 } finally { 720 if (span != null) { 721 span.end(); 722 } 723 } 724 } 725 726 protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException { 727 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1); 728 ServerCall.setExceptionResponse(e, msg, headerBuilder); 729 ByteBuffer headerBuf = 730 ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null); 731 BufferChain buf = new BufferChain(headerBuf); 732 return () -> buf; 733 } 734 735 private void doBadPreambleHandling(String msg) throws IOException { 736 doBadPreambleHandling(msg, new FatalConnectionException(msg)); 737 } 738 739 private void doBadPreambleHandling(String msg, Exception e) throws IOException { 740 SimpleRpcServer.LOG.warn(msg); 741 doRespond(getErrorResponse(msg, e)); 742 } 743 744 protected final void callCleanupIfNeeded() { 745 if (callCleanup != null) { 746 callCleanup.run(); 747 callCleanup = null; 748 } 749 } 750 751 protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOException { 752 assert preambleBuffer.remaining() == 6; 753 for (int i = 0; i < RPC_HEADER.length; i++) { 754 if (RPC_HEADER[i] != preambleBuffer.get()) { 755 doBadPreambleHandling( 756 "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" 757 + Bytes.toStringBinary(preambleBuffer.array(), 0, RPC_HEADER.length) + " from " 758 + toString()); 759 return false; 760 } 761 } 762 int version = preambleBuffer.get() & 0xFF; 763 byte authbyte = preambleBuffer.get(); 764 765 if (version != SimpleRpcServer.CURRENT_VERSION) { 766 String msg = getFatalConnectionString(version, authbyte); 767 doBadPreambleHandling(msg, new WrongVersionException(msg)); 768 return false; 769 } 770 this.provider = this.saslProviders.selectProvider(authbyte); 771 if (this.provider == null) { 772 String msg = getFatalConnectionString(version, authbyte); 773 doBadPreambleHandling(msg, new BadAuthException(msg)); 774 return false; 775 } 776 // TODO this is a wart while simple auth'n doesn't go through sasl. 777 if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) { 778 if (this.rpcServer.allowFallbackToSimpleAuth) { 779 this.rpcServer.metrics.authenticationFallback(); 780 authenticatedWithFallback = true; 781 } else { 782 AccessDeniedException ae = new AccessDeniedException("Authentication is required"); 783 doRespond(getErrorResponse(ae.getMessage(), ae)); 784 return false; 785 } 786 } 787 if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) { 788 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, 789 null); 790 provider = saslProviders.getSimpleProvider(); 791 // client has already sent the initial Sasl message and we 792 // should ignore it. Both client and server should fall back 793 // to simple auth from now on. 794 skipInitialSaslHandshake = true; 795 } 796 useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider); 797 return true; 798 } 799 800 boolean isSimpleAuthentication() { 801 return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider; 802 } 803 804 public abstract boolean isConnectionOpen(); 805 806 public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md, 807 RequestHeader header, Message param, CellScanner cellScanner, long size, 808 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup); 809 810 private static class ByteBuffByteInput extends ByteInput { 811 812 private ByteBuff buf; 813 private int offset; 814 private int length; 815 816 ByteBuffByteInput(ByteBuff buf, int offset, int length) { 817 this.buf = buf; 818 this.offset = offset; 819 this.length = length; 820 } 821 822 @Override 823 public byte read(int offset) { 824 return this.buf.get(getAbsoluteOffset(offset)); 825 } 826 827 private int getAbsoluteOffset(int offset) { 828 return this.offset + offset; 829 } 830 831 @Override 832 public int read(int offset, byte[] out, int outOffset, int len) { 833 this.buf.get(getAbsoluteOffset(offset), out, outOffset, len); 834 return len; 835 } 836 837 @Override 838 public int read(int offset, ByteBuffer out) { 839 int len = out.remaining(); 840 this.buf.get(out, getAbsoluteOffset(offset), len); 841 return len; 842 } 843 844 @Override 845 public int size() { 846 return this.length; 847 } 848 } 849}