001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER; 021 022import java.io.ByteArrayInputStream; 023import java.io.Closeable; 024import java.io.DataOutputStream; 025import java.io.IOException; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.nio.ByteBuffer; 029import java.nio.channels.Channels; 030import java.nio.channels.ReadableByteChannel; 031import java.security.GeneralSecurityException; 032import java.util.Objects; 033import java.util.Properties; 034 035import org.apache.commons.crypto.cipher.CryptoCipherFactory; 036import org.apache.commons.crypto.random.CryptoRandom; 037import org.apache.commons.crypto.random.CryptoRandomFactory; 038import org.apache.hadoop.hbase.CellScanner; 039import org.apache.hadoop.hbase.DoNotRetryIOException; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.hadoop.hbase.client.VersionInfoUtil; 042import org.apache.hadoop.hbase.codec.Codec; 043import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 044import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 045import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 046import org.apache.hadoop.hbase.nio.ByteBuff; 047import org.apache.hadoop.hbase.nio.SingleByteBuff; 048import org.apache.hadoop.hbase.security.AccessDeniedException; 049import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 050import org.apache.hadoop.hbase.security.SaslStatus; 051import org.apache.hadoop.hbase.security.SaslUtil; 052import org.apache.hadoop.hbase.security.User; 053import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider; 054import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders; 055import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider; 056import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 057import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; 058import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 059import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 060import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 061import org.apache.hbase.thirdparty.com.google.protobuf.Message; 062import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 063import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 064import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.io.BytesWritable; 073import org.apache.hadoop.io.IntWritable; 074import org.apache.hadoop.io.Writable; 075import org.apache.hadoop.io.WritableUtils; 076import org.apache.hadoop.io.compress.CompressionCodec; 077import org.apache.hadoop.security.UserGroupInformation; 078import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 079import org.apache.hadoop.security.authorize.AuthorizationException; 080import org.apache.hadoop.security.authorize.ProxyUsers; 081import org.apache.hadoop.security.token.SecretManager.InvalidToken; 082 083/** Reads calls from a connection and queues them for handling. */ 084@edu.umd.cs.findbugs.annotations.SuppressWarnings( 085 value="VO_VOLATILE_INCREMENT", 086 justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 087@InterfaceAudience.Private 088abstract class ServerRpcConnection implements Closeable { 089 /** */ 090 protected final RpcServer rpcServer; 091 // If the connection header has been read or not. 092 protected boolean connectionHeaderRead = false; 093 094 protected CallCleanup callCleanup; 095 096 // Cache the remote host & port info so that even if the socket is 097 // disconnected, we can say where it used to connect to. 098 protected String hostAddress; 099 protected int remotePort; 100 protected InetAddress addr; 101 protected ConnectionHeader connectionHeader; 102 103 /** 104 * Codec the client asked use. 105 */ 106 protected Codec codec; 107 /** 108 * Compression codec the client asked us use. 109 */ 110 protected CompressionCodec compressionCodec; 111 protected BlockingService service; 112 113 protected SaslServerAuthenticationProvider provider; 114 protected boolean saslContextEstablished; 115 protected boolean skipInitialSaslHandshake; 116 private ByteBuffer unwrappedData; 117 // When is this set? FindBugs wants to know! Says NP 118 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); 119 protected boolean useSasl; 120 protected HBaseSaslRpcServer saslServer; 121 protected CryptoAES cryptoAES; 122 protected boolean useWrap = false; 123 protected boolean useCryptoAesWrap = false; 124 125 // was authentication allowed with a fallback to simple auth 126 protected boolean authenticatedWithFallback; 127 128 protected boolean retryImmediatelySupported = false; 129 130 protected User user = null; 131 protected UserGroupInformation ugi = null; 132 protected SaslServerAuthenticationProviders saslProviders = null; 133 134 public ServerRpcConnection(RpcServer rpcServer) { 135 this.rpcServer = rpcServer; 136 this.callCleanup = null; 137 this.saslProviders = SaslServerAuthenticationProviders.getInstance(rpcServer.getConf()); 138 } 139 140 @Override 141 public String toString() { 142 return getHostAddress() + ":" + remotePort; 143 } 144 145 public String getHostAddress() { 146 return hostAddress; 147 } 148 149 public InetAddress getHostInetAddress() { 150 return addr; 151 } 152 153 public int getRemotePort() { 154 return remotePort; 155 } 156 157 public VersionInfo getVersionInfo() { 158 if (connectionHeader.hasVersionInfo()) { 159 return connectionHeader.getVersionInfo(); 160 } 161 return null; 162 } 163 164 private String getFatalConnectionString(final int version, final byte authByte) { 165 return "serverVersion=" + RpcServer.CURRENT_VERSION + 166 ", clientVersion=" + version + ", authMethod=" + authByte + 167 // The provider may be null if we failed to parse the header of the request 168 ", authName=" + (provider == null ? "unknown" : provider.getSaslAuthMethod().getName()) + 169 " from " + toString(); 170 } 171 172 /** 173 * Set up cell block codecs 174 * @throws FatalConnectionException 175 */ 176 private void setupCellBlockCodecs(final ConnectionHeader header) 177 throws FatalConnectionException { 178 // TODO: Plug in other supported decoders. 179 if (!header.hasCellBlockCodecClass()) return; 180 String className = header.getCellBlockCodecClass(); 181 if (className == null || className.length() == 0) return; 182 try { 183 this.codec = (Codec)Class.forName(className).getDeclaredConstructor().newInstance(); 184 } catch (Exception e) { 185 throw new UnsupportedCellCodecException(className, e); 186 } 187 if (!header.hasCellBlockCompressorClass()) return; 188 className = header.getCellBlockCompressorClass(); 189 try { 190 this.compressionCodec = 191 (CompressionCodec)Class.forName(className).getDeclaredConstructor().newInstance(); 192 } catch (Exception e) { 193 throw new UnsupportedCompressionCodecException(className, e); 194 } 195 } 196 197 /** 198 * Set up cipher for rpc encryption with Apache Commons Crypto 199 * 200 * @throws FatalConnectionException 201 */ 202 private void setupCryptoCipher(final ConnectionHeader header, 203 RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) 204 throws FatalConnectionException { 205 // If simple auth, return 206 if (saslServer == null) return; 207 // check if rpc encryption with Crypto AES 208 String qop = saslServer.getNegotiatedQop(); 209 boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY 210 .getSaslQop().equalsIgnoreCase(qop); 211 boolean isCryptoAesEncryption = isEncryption && this.rpcServer.conf.getBoolean( 212 "hbase.rpc.crypto.encryption.aes.enabled", false); 213 if (!isCryptoAesEncryption) return; 214 if (!header.hasRpcCryptoCipherTransformation()) return; 215 String transformation = header.getRpcCryptoCipherTransformation(); 216 if (transformation == null || transformation.length() == 0) return; 217 // Negotiates AES based on complete saslServer. 218 // The Crypto metadata need to be encrypted and send to client. 219 Properties properties = new Properties(); 220 // the property for SecureRandomFactory 221 properties.setProperty(CryptoRandomFactory.CLASSES_KEY, 222 this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random", 223 "org.apache.commons.crypto.random.JavaCryptoRandom")); 224 // the property for cipher class 225 properties.setProperty(CryptoCipherFactory.CLASSES_KEY, 226 this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class", 227 "org.apache.commons.crypto.cipher.JceCipher")); 228 229 int cipherKeyBits = this.rpcServer.conf.getInt( 230 "hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128); 231 // generate key and iv 232 if (cipherKeyBits % 8 != 0) { 233 throw new IllegalArgumentException("The AES cipher key size in bits" + 234 " should be a multiple of byte"); 235 } 236 int len = cipherKeyBits / 8; 237 byte[] inKey = new byte[len]; 238 byte[] outKey = new byte[len]; 239 byte[] inIv = new byte[len]; 240 byte[] outIv = new byte[len]; 241 242 try { 243 // generate the cipher meta data with SecureRandom 244 CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); 245 secureRandom.nextBytes(inKey); 246 secureRandom.nextBytes(outKey); 247 secureRandom.nextBytes(inIv); 248 secureRandom.nextBytes(outIv); 249 250 // create CryptoAES for server 251 cryptoAES = new CryptoAES(transformation, properties, 252 inKey, outKey, inIv, outIv); 253 // create SaslCipherMeta and send to client, 254 // for client, the [inKey, outKey], [inIv, outIv] should be reversed 255 RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder(); 256 ccmBuilder.setTransformation(transformation); 257 ccmBuilder.setInIv(getByteString(outIv)); 258 ccmBuilder.setInKey(getByteString(outKey)); 259 ccmBuilder.setOutIv(getByteString(inIv)); 260 ccmBuilder.setOutKey(getByteString(inKey)); 261 chrBuilder.setCryptoCipherMeta(ccmBuilder); 262 useCryptoAesWrap = true; 263 } catch (GeneralSecurityException | IOException ex) { 264 throw new UnsupportedCryptoException(ex.getMessage(), ex); 265 } 266 } 267 268 private ByteString getByteString(byte[] bytes) { 269 // return singleton to reduce object allocation 270 return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); 271 } 272 273 private UserGroupInformation createUser(ConnectionHeader head) { 274 UserGroupInformation ugi = null; 275 276 if (!head.hasUserInfo()) { 277 return null; 278 } 279 UserInformation userInfoProto = head.getUserInfo(); 280 String effectiveUser = null; 281 if (userInfoProto.hasEffectiveUser()) { 282 effectiveUser = userInfoProto.getEffectiveUser(); 283 } 284 String realUser = null; 285 if (userInfoProto.hasRealUser()) { 286 realUser = userInfoProto.getRealUser(); 287 } 288 if (effectiveUser != null) { 289 if (realUser != null) { 290 UserGroupInformation realUserUgi = 291 UserGroupInformation.createRemoteUser(realUser); 292 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi); 293 } else { 294 ugi = UserGroupInformation.createRemoteUser(effectiveUser); 295 } 296 } 297 return ugi; 298 } 299 300 protected final void disposeSasl() { 301 if (saslServer != null) { 302 saslServer.dispose(); 303 saslServer = null; 304 } 305 } 306 307 /** 308 * No protobuf encoding of raw sasl messages 309 */ 310 protected final void doRawSaslReply(SaslStatus status, Writable rv, 311 String errorClass, String error) throws IOException { 312 BufferChain bc; 313 // In my testing, have noticed that sasl messages are usually 314 // in the ballpark of 100-200. That's why the initial capacity is 256. 315 try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256); 316 DataOutputStream out = new DataOutputStream(saslResponse)) { 317 out.writeInt(status.state); // write status 318 if (status == SaslStatus.SUCCESS) { 319 rv.write(out); 320 } else { 321 WritableUtils.writeString(out, errorClass); 322 WritableUtils.writeString(out, error); 323 } 324 bc = new BufferChain(saslResponse.getByteBuffer()); 325 } 326 doRespond(() -> bc); 327 } 328 329 public void saslReadAndProcess(ByteBuff saslToken) throws IOException, 330 InterruptedException { 331 if (saslContextEstablished) { 332 RpcServer.LOG.trace("Read input token of size={} for processing by saslServer.unwrap()", 333 saslToken.limit()); 334 if (!useWrap) { 335 processOneRpc(saslToken); 336 } else { 337 byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes(); 338 byte [] plaintextData; 339 if (useCryptoAesWrap) { 340 // unwrap with CryptoAES 341 plaintextData = cryptoAES.unwrap(b, 0, b.length); 342 } else { 343 plaintextData = saslServer.unwrap(b, 0, b.length); 344 } 345 processUnwrappedData(plaintextData); 346 } 347 } else { 348 byte[] replyToken; 349 try { 350 if (saslServer == null) { 351 try { 352 saslServer = 353 new HBaseSaslRpcServer(provider, rpcServer.saslProps, rpcServer.secretManager); 354 } catch (Exception e){ 355 RpcServer.LOG.error("Error when trying to create instance of HBaseSaslRpcServer " 356 + "with sasl provider: " + provider, e); 357 throw e; 358 } 359 RpcServer.LOG.debug("Created SASL server with mechanism={}", 360 provider.getSaslAuthMethod().getAuthMethod()); 361 } 362 RpcServer.LOG.debug("Read input token of size={} for processing by saslServer." + 363 "evaluateResponse()", saslToken.limit()); 364 replyToken = saslServer.evaluateResponse(saslToken.hasArray()? 365 saslToken.array() : saslToken.toBytes()); 366 } catch (IOException e) { 367 IOException sendToClient = e; 368 Throwable cause = e; 369 while (cause != null) { 370 if (cause instanceof InvalidToken) { 371 sendToClient = (InvalidToken) cause; 372 break; 373 } 374 cause = cause.getCause(); 375 } 376 doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(), 377 sendToClient.getLocalizedMessage()); 378 this.rpcServer.metrics.authenticationFailure(); 379 String clientIP = this.toString(); 380 // attempting user could be null 381 RpcServer.AUDITLOG 382 .warn("{} {}: {}", RpcServer.AUTH_FAILED_FOR, clientIP, saslServer.getAttemptingUser()); 383 throw e; 384 } 385 if (replyToken != null) { 386 if (RpcServer.LOG.isDebugEnabled()) { 387 RpcServer.LOG.debug("Will send token of size " + replyToken.length 388 + " from saslServer."); 389 } 390 doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null, 391 null); 392 } 393 if (saslServer.isComplete()) { 394 String qop = saslServer.getNegotiatedQop(); 395 useWrap = qop != null && !"auth".equalsIgnoreCase(qop); 396 ugi = provider.getAuthorizedUgi(saslServer.getAuthorizationID(), 397 this.rpcServer.secretManager); 398 RpcServer.LOG.debug( 399 "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", 400 ugi, qop); 401 this.rpcServer.metrics.authenticationSuccess(); 402 RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); 403 saslContextEstablished = true; 404 } 405 } 406 } 407 408 private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException { 409 ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf)); 410 // Read all RPCs contained in the inBuf, even partial ones 411 while (true) { 412 int count; 413 if (unwrappedDataLengthBuffer.remaining() > 0) { 414 count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer); 415 if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) 416 return; 417 } 418 419 if (unwrappedData == null) { 420 unwrappedDataLengthBuffer.flip(); 421 int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); 422 423 if (unwrappedDataLength == RpcClient.PING_CALL_ID) { 424 if (RpcServer.LOG.isDebugEnabled()) 425 RpcServer.LOG.debug("Received ping message"); 426 unwrappedDataLengthBuffer.clear(); 427 continue; // ping message 428 } 429 unwrappedData = ByteBuffer.allocate(unwrappedDataLength); 430 } 431 432 count = this.rpcServer.channelRead(ch, unwrappedData); 433 if (count <= 0 || unwrappedData.remaining() > 0) 434 return; 435 436 if (unwrappedData.remaining() == 0) { 437 unwrappedDataLengthBuffer.clear(); 438 unwrappedData.flip(); 439 processOneRpc(new SingleByteBuff(unwrappedData)); 440 unwrappedData = null; 441 } 442 } 443 } 444 445 public void processOneRpc(ByteBuff buf) throws IOException, 446 InterruptedException { 447 if (connectionHeaderRead) { 448 processRequest(buf); 449 } else { 450 processConnectionHeader(buf); 451 this.connectionHeaderRead = true; 452 if (!authorizeConnection()) { 453 // Throw FatalConnectionException wrapping ACE so client does right thing and closes 454 // down the connection instead of trying to read non-existent retun. 455 throw new AccessDeniedException("Connection from " + this + " for service " + 456 connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); 457 } 458 this.user = this.rpcServer.userProvider.create(this.ugi); 459 } 460 } 461 462 private boolean authorizeConnection() throws IOException { 463 try { 464 // If auth method is DIGEST, the token was obtained by the 465 // real user for the effective user, therefore not required to 466 // authorize real user. doAs is allowed only for simple or kerberos 467 // authentication 468 if (ugi != null && ugi.getRealUser() != null 469 && provider.supportsProtocolAuthentication()) { 470 ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf); 471 } 472 this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress()); 473 this.rpcServer.metrics.authorizationSuccess(); 474 } catch (AuthorizationException ae) { 475 if (RpcServer.LOG.isDebugEnabled()) { 476 RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae); 477 } 478 this.rpcServer.metrics.authorizationFailure(); 479 doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae))); 480 return false; 481 } 482 return true; 483 } 484 485 // Reads the connection header following version 486 private void processConnectionHeader(ByteBuff buf) throws IOException { 487 if (buf.hasArray()) { 488 this.connectionHeader = ConnectionHeader.parseFrom(buf.array()); 489 } else { 490 CodedInputStream cis = UnsafeByteOperations.unsafeWrap( 491 new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput(); 492 cis.enableAliasing(true); 493 this.connectionHeader = ConnectionHeader.parseFrom(cis); 494 } 495 String serviceName = connectionHeader.getServiceName(); 496 if (serviceName == null) throw new EmptyServiceNameException(); 497 this.service = RpcServer.getService(this.rpcServer.services, serviceName); 498 if (this.service == null) throw new UnknownServiceException(serviceName); 499 setupCellBlockCodecs(this.connectionHeader); 500 RPCProtos.ConnectionHeaderResponse.Builder chrBuilder = 501 RPCProtos.ConnectionHeaderResponse.newBuilder(); 502 setupCryptoCipher(this.connectionHeader, chrBuilder); 503 responseConnectionHeader(chrBuilder); 504 UserGroupInformation protocolUser = createUser(connectionHeader); 505 if (!useSasl) { 506 ugi = protocolUser; 507 if (ugi != null) { 508 ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE); 509 } 510 // audit logging for SASL authenticated users happens in saslReadAndProcess() 511 if (authenticatedWithFallback) { 512 RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", 513 ugi, getHostAddress()); 514 } 515 } else { 516 // user is authenticated 517 ugi.setAuthenticationMethod(provider.getSaslAuthMethod().getAuthMethod()); 518 //Now we check if this is a proxy user case. If the protocol user is 519 //different from the 'user', it is a proxy user scenario. However, 520 //this is not allowed if user authenticated with DIGEST. 521 if ((protocolUser != null) 522 && (!protocolUser.getUserName().equals(ugi.getUserName()))) { 523 if (!provider.supportsProtocolAuthentication()) { 524 // Not allowed to doAs if token authentication is used 525 throw new AccessDeniedException("Authenticated user (" + ugi 526 + ") doesn't match what the client claims to be (" 527 + protocolUser + ")"); 528 } else { 529 // Effective user can be different from authenticated user 530 // for simple auth or kerberos auth 531 // The user is the real user. Now we create a proxy user 532 UserGroupInformation realUser = ugi; 533 ugi = UserGroupInformation.createProxyUser(protocolUser 534 .getUserName(), realUser); 535 // Now the user is a proxy user, set Authentication method Proxy. 536 ugi.setAuthenticationMethod(AuthenticationMethod.PROXY); 537 } 538 } 539 } 540 String version; 541 if (this.connectionHeader.hasVersionInfo()) { 542 // see if this connection will support RetryImmediatelyException 543 this.retryImmediatelySupported = 544 VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2); 545 version = this.connectionHeader.getVersionInfo().getVersion(); 546 } else { 547 version = "UNKNOWN"; 548 } 549 RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}", 550 this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName); 551 } 552 553 /** 554 * Send the response for connection header 555 */ 556 private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) 557 throws FatalConnectionException { 558 // Response the connection header if Crypto AES is enabled 559 if (!chrBuilder.hasCryptoCipherMeta()) return; 560 try { 561 byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray(); 562 // encrypt the Crypto AES cipher meta data with sasl server, and send to client 563 byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4]; 564 Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4); 565 Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length); 566 byte[] wrapped = saslServer.wrap(unwrapped, 0, unwrapped.length); 567 BufferChain bc; 568 try (ByteBufferOutputStream response = new ByteBufferOutputStream(wrapped.length + 4); 569 DataOutputStream out = new DataOutputStream(response)) { 570 out.writeInt(wrapped.length); 571 out.write(wrapped); 572 bc = new BufferChain(response.getByteBuffer()); 573 } 574 doRespond(() -> bc); 575 } catch (IOException ex) { 576 throw new UnsupportedCryptoException(ex.getMessage(), ex); 577 } 578 } 579 580 protected abstract void doRespond(RpcResponse resp) throws IOException; 581 582 /** 583 * @param buf 584 * Has the request header and the request param and optionally 585 * encoded data buffer all in this one array. 586 * @throws IOException 587 * @throws InterruptedException 588 */ 589 protected void processRequest(ByteBuff buf) throws IOException, 590 InterruptedException { 591 long totalRequestSize = buf.limit(); 592 int offset = 0; 593 // Here we read in the header. We avoid having pb 594 // do its default 4k allocation for CodedInputStream. We force it to use 595 // backing array. 596 CodedInputStream cis; 597 if (buf.hasArray()) { 598 cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput(); 599 } else { 600 cis = UnsafeByteOperations 601 .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput(); 602 } 603 cis.enableAliasing(true); 604 int headerSize = cis.readRawVarint32(); 605 offset = cis.getTotalBytesRead(); 606 Message.Builder builder = RequestHeader.newBuilder(); 607 ProtobufUtil.mergeFrom(builder, cis, headerSize); 608 RequestHeader header = (RequestHeader) builder.build(); 609 offset += headerSize; 610 int id = header.getCallId(); 611 if (RpcServer.LOG.isTraceEnabled()) { 612 RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) 613 + " totalRequestSize: " + totalRequestSize + " bytes"); 614 } 615 // Enforcing the call queue size, this triggers a retry in the client 616 // This is a bit late to be doing this check - we have already read in the 617 // total request. 618 if ((totalRequestSize + 619 this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) { 620 final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null, 621 totalRequestSize, null, 0, this.callCleanup); 622 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 623 callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, 624 "Call queue is full on " + this.rpcServer.server.getServerName() + 625 ", is hbase.ipc.server.max.callqueue.size too small?"); 626 callTooBig.sendResponseIfReady(); 627 return; 628 } 629 MethodDescriptor md = null; 630 Message param = null; 631 CellScanner cellScanner = null; 632 try { 633 if (header.hasRequestParam() && header.getRequestParam()) { 634 md = this.service.getDescriptorForType().findMethodByName( 635 header.getMethodName()); 636 if (md == null) 637 throw new UnsupportedOperationException(header.getMethodName()); 638 builder = this.service.getRequestPrototype(md).newBuilderForType(); 639 cis.resetSizeCounter(); 640 int paramSize = cis.readRawVarint32(); 641 offset += cis.getTotalBytesRead(); 642 if (builder != null) { 643 ProtobufUtil.mergeFrom(builder, cis, paramSize); 644 param = builder.build(); 645 } 646 offset += paramSize; 647 } else { 648 // currently header must have request param, so we directly throw 649 // exception here 650 String msg = "Invalid request header: " 651 + TextFormat.shortDebugString(header) 652 + ", should have param set in it"; 653 RpcServer.LOG.warn(msg); 654 throw new DoNotRetryIOException(msg); 655 } 656 if (header.hasCellBlockMeta()) { 657 buf.position(offset); 658 ByteBuff dup = buf.duplicate(); 659 dup.limit(offset + header.getCellBlockMeta().getLength()); 660 cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers( 661 this.codec, this.compressionCodec, dup); 662 } 663 } catch (Throwable t) { 664 InetSocketAddress address = this.rpcServer.getListenerAddress(); 665 String msg = (address != null ? address : "(channel closed)") 666 + " is unable to read call parameter from client " 667 + getHostAddress(); 668 RpcServer.LOG.warn(msg, t); 669 670 this.rpcServer.metrics.exception(t); 671 672 // probably the hbase hadoop version does not match the running hadoop 673 // version 674 if (t instanceof LinkageError) { 675 t = new DoNotRetryIOException(t); 676 } 677 // If the method is not present on the server, do not retry. 678 if (t instanceof UnsupportedOperationException) { 679 t = new DoNotRetryIOException(t); 680 } 681 682 ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null, 683 totalRequestSize, null, 0, this.callCleanup); 684 readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage()); 685 readParamsFailedCall.sendResponseIfReady(); 686 return; 687 } 688 689 int timeout = 0; 690 if (header.hasTimeout() && header.getTimeout() > 0) { 691 timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); 692 } 693 ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, totalRequestSize, 694 this.addr, timeout, this.callCleanup); 695 696 if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { 697 this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); 698 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 699 call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, 700 "Call queue is full on " + this.rpcServer.server.getServerName() + 701 ", too many items queued ?"); 702 call.sendResponseIfReady(); 703 } 704 } 705 706 protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException { 707 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1); 708 ServerCall.setExceptionResponse(e, msg, headerBuilder); 709 ByteBuffer headerBuf = 710 ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null); 711 BufferChain buf = new BufferChain(headerBuf); 712 return () -> buf; 713 } 714 715 private void doBadPreambleHandling(String msg) throws IOException { 716 doBadPreambleHandling(msg, new FatalConnectionException(msg)); 717 } 718 719 private void doBadPreambleHandling(String msg, Exception e) throws IOException { 720 SimpleRpcServer.LOG.warn(msg); 721 doRespond(getErrorResponse(msg, e)); 722 } 723 724 protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOException { 725 assert preambleBuffer.remaining() == 6; 726 for (int i = 0; i < RPC_HEADER.length; i++) { 727 if (RPC_HEADER[i] != preambleBuffer.get()) { 728 doBadPreambleHandling( 729 "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" + 730 Bytes.toStringBinary(preambleBuffer.array(), 0, RPC_HEADER.length) + " from " + 731 toString()); 732 return false; 733 } 734 } 735 int version = preambleBuffer.get() & 0xFF; 736 byte authbyte = preambleBuffer.get(); 737 738 if (version != SimpleRpcServer.CURRENT_VERSION) { 739 String msg = getFatalConnectionString(version, authbyte); 740 doBadPreambleHandling(msg, new WrongVersionException(msg)); 741 return false; 742 } 743 this.provider = this.saslProviders.selectProvider(authbyte); 744 if (this.provider == null) { 745 String msg = getFatalConnectionString(version, authbyte); 746 doBadPreambleHandling(msg, new BadAuthException(msg)); 747 return false; 748 } 749 // TODO this is a wart while simple auth'n doesn't go through sasl. 750 if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) { 751 if (this.rpcServer.allowFallbackToSimpleAuth) { 752 this.rpcServer.metrics.authenticationFallback(); 753 authenticatedWithFallback = true; 754 } else { 755 AccessDeniedException ae = new AccessDeniedException("Authentication is required"); 756 doRespond(getErrorResponse(ae.getMessage(), ae)); 757 return false; 758 } 759 } 760 if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) { 761 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, 762 null); 763 provider = saslProviders.getSimpleProvider(); 764 // client has already sent the initial Sasl message and we 765 // should ignore it. Both client and server should fall back 766 // to simple auth from now on. 767 skipInitialSaslHandshake = true; 768 } 769 useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider); 770 return true; 771 } 772 773 boolean isSimpleAuthentication() { 774 return Objects.requireNonNull(provider) instanceof SimpleSaslServerAuthenticationProvider; 775 } 776 777 public abstract boolean isConnectionOpen(); 778 779 public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md, 780 RequestHeader header, Message param, CellScanner cellScanner, long size, 781 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup); 782 783 private static class ByteBuffByteInput extends ByteInput { 784 785 private ByteBuff buf; 786 private int offset; 787 private int length; 788 789 ByteBuffByteInput(ByteBuff buf, int offset, int length) { 790 this.buf = buf; 791 this.offset = offset; 792 this.length = length; 793 } 794 795 @Override 796 public byte read(int offset) { 797 return this.buf.get(getAbsoluteOffset(offset)); 798 } 799 800 private int getAbsoluteOffset(int offset) { 801 return this.offset + offset; 802 } 803 804 @Override 805 public int read(int offset, byte[] out, int outOffset, int len) { 806 this.buf.get(getAbsoluteOffset(offset), out, outOffset, len); 807 return len; 808 } 809 810 @Override 811 public int read(int offset, ByteBuffer out) { 812 int len = out.remaining(); 813 this.buf.get(out, getAbsoluteOffset(offset), len); 814 return len; 815 } 816 817 @Override 818 public int size() { 819 return this.length; 820 } 821 } 822}