001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.HConstants.RPC_HEADER; 021 022import java.io.ByteArrayInputStream; 023import java.io.Closeable; 024import java.io.DataOutputStream; 025import java.io.IOException; 026import java.net.InetAddress; 027import java.net.InetSocketAddress; 028import java.nio.ByteBuffer; 029import java.nio.channels.Channels; 030import java.nio.channels.ReadableByteChannel; 031import java.security.GeneralSecurityException; 032import java.util.Properties; 033 034import org.apache.commons.crypto.cipher.CryptoCipherFactory; 035import org.apache.commons.crypto.random.CryptoRandom; 036import org.apache.commons.crypto.random.CryptoRandomFactory; 037import org.apache.hadoop.hbase.CellScanner; 038import org.apache.hadoop.hbase.DoNotRetryIOException; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.hadoop.hbase.client.VersionInfoUtil; 041import org.apache.hadoop.hbase.codec.Codec; 042import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 043import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 044import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 045import org.apache.hadoop.hbase.nio.ByteBuff; 046import org.apache.hadoop.hbase.nio.SingleByteBuff; 047import org.apache.hadoop.hbase.security.AccessDeniedException; 048import org.apache.hadoop.hbase.security.AuthMethod; 049import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 050import org.apache.hadoop.hbase.security.SaslStatus; 051import org.apache.hadoop.hbase.security.SaslUtil; 052import org.apache.hadoop.hbase.security.User; 053import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 054import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; 055import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 056import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 057import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 058import org.apache.hbase.thirdparty.com.google.protobuf.Message; 059import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 060import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.io.BytesWritable; 070import org.apache.hadoop.io.IntWritable; 071import org.apache.hadoop.io.Writable; 072import org.apache.hadoop.io.WritableUtils; 073import org.apache.hadoop.io.compress.CompressionCodec; 074import org.apache.hadoop.security.UserGroupInformation; 075import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 076import org.apache.hadoop.security.authorize.AuthorizationException; 077import org.apache.hadoop.security.authorize.ProxyUsers; 078import org.apache.hadoop.security.token.SecretManager.InvalidToken; 079import org.apache.hadoop.security.token.TokenIdentifier; 080 081/** Reads calls from a connection and queues them for handling. */ 082@edu.umd.cs.findbugs.annotations.SuppressWarnings( 083 value="VO_VOLATILE_INCREMENT", 084 justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 085@InterfaceAudience.Private 086abstract class ServerRpcConnection implements Closeable { 087 /** */ 088 protected final RpcServer rpcServer; 089 // If the connection header has been read or not. 090 protected boolean connectionHeaderRead = false; 091 092 protected CallCleanup callCleanup; 093 094 // Cache the remote host & port info so that even if the socket is 095 // disconnected, we can say where it used to connect to. 096 protected String hostAddress; 097 protected int remotePort; 098 protected InetAddress addr; 099 protected ConnectionHeader connectionHeader; 100 101 /** 102 * Codec the client asked use. 103 */ 104 protected Codec codec; 105 /** 106 * Compression codec the client asked us use. 107 */ 108 protected CompressionCodec compressionCodec; 109 protected BlockingService service; 110 111 protected AuthMethod authMethod; 112 protected boolean saslContextEstablished; 113 protected boolean skipInitialSaslHandshake; 114 private ByteBuffer unwrappedData; 115 // When is this set? FindBugs wants to know! Says NP 116 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); 117 protected boolean useSasl; 118 protected HBaseSaslRpcServer saslServer; 119 protected CryptoAES cryptoAES; 120 protected boolean useWrap = false; 121 protected boolean useCryptoAesWrap = false; 122 123 // was authentication allowed with a fallback to simple auth 124 protected boolean authenticatedWithFallback; 125 126 protected boolean retryImmediatelySupported = false; 127 128 protected User user = null; 129 protected UserGroupInformation ugi = null; 130 131 public ServerRpcConnection(RpcServer rpcServer) { 132 this.rpcServer = rpcServer; 133 this.callCleanup = null; 134 } 135 136 @Override 137 public String toString() { 138 return getHostAddress() + ":" + remotePort; 139 } 140 141 public String getHostAddress() { 142 return hostAddress; 143 } 144 145 public InetAddress getHostInetAddress() { 146 return addr; 147 } 148 149 public int getRemotePort() { 150 return remotePort; 151 } 152 153 public VersionInfo getVersionInfo() { 154 if (connectionHeader.hasVersionInfo()) { 155 return connectionHeader.getVersionInfo(); 156 } 157 return null; 158 } 159 160 private String getFatalConnectionString(final int version, final byte authByte) { 161 return "serverVersion=" + RpcServer.CURRENT_VERSION + 162 ", clientVersion=" + version + ", authMethod=" + authByte + 163 ", authSupported=" + (authMethod != null) + " from " + toString(); 164 } 165 166 private UserGroupInformation getAuthorizedUgi(String authorizedId) 167 throws IOException { 168 UserGroupInformation authorizedUgi; 169 if (authMethod == AuthMethod.DIGEST) { 170 TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId, 171 this.rpcServer.secretManager); 172 authorizedUgi = tokenId.getUser(); 173 if (authorizedUgi == null) { 174 throw new AccessDeniedException( 175 "Can't retrieve username from tokenIdentifier."); 176 } 177 authorizedUgi.addTokenIdentifier(tokenId); 178 } else { 179 authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId); 180 } 181 authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod()); 182 return authorizedUgi; 183 } 184 185 /** 186 * Set up cell block codecs 187 * @throws FatalConnectionException 188 */ 189 private void setupCellBlockCodecs(final ConnectionHeader header) 190 throws FatalConnectionException { 191 // TODO: Plug in other supported decoders. 192 if (!header.hasCellBlockCodecClass()) return; 193 String className = header.getCellBlockCodecClass(); 194 if (className == null || className.length() == 0) return; 195 try { 196 this.codec = (Codec)Class.forName(className).getDeclaredConstructor().newInstance(); 197 } catch (Exception e) { 198 throw new UnsupportedCellCodecException(className, e); 199 } 200 if (!header.hasCellBlockCompressorClass()) return; 201 className = header.getCellBlockCompressorClass(); 202 try { 203 this.compressionCodec = 204 (CompressionCodec)Class.forName(className).getDeclaredConstructor().newInstance(); 205 } catch (Exception e) { 206 throw new UnsupportedCompressionCodecException(className, e); 207 } 208 } 209 210 /** 211 * Set up cipher for rpc encryption with Apache Commons Crypto 212 * 213 * @throws FatalConnectionException 214 */ 215 private void setupCryptoCipher(final ConnectionHeader header, 216 RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) 217 throws FatalConnectionException { 218 // If simple auth, return 219 if (saslServer == null) return; 220 // check if rpc encryption with Crypto AES 221 String qop = saslServer.getNegotiatedQop(); 222 boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY 223 .getSaslQop().equalsIgnoreCase(qop); 224 boolean isCryptoAesEncryption = isEncryption && this.rpcServer.conf.getBoolean( 225 "hbase.rpc.crypto.encryption.aes.enabled", false); 226 if (!isCryptoAesEncryption) return; 227 if (!header.hasRpcCryptoCipherTransformation()) return; 228 String transformation = header.getRpcCryptoCipherTransformation(); 229 if (transformation == null || transformation.length() == 0) return; 230 // Negotiates AES based on complete saslServer. 231 // The Crypto metadata need to be encrypted and send to client. 232 Properties properties = new Properties(); 233 // the property for SecureRandomFactory 234 properties.setProperty(CryptoRandomFactory.CLASSES_KEY, 235 this.rpcServer.conf.get("hbase.crypto.sasl.encryption.aes.crypto.random", 236 "org.apache.commons.crypto.random.JavaCryptoRandom")); 237 // the property for cipher class 238 properties.setProperty(CryptoCipherFactory.CLASSES_KEY, 239 this.rpcServer.conf.get("hbase.rpc.crypto.encryption.aes.cipher.class", 240 "org.apache.commons.crypto.cipher.JceCipher")); 241 242 int cipherKeyBits = this.rpcServer.conf.getInt( 243 "hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128); 244 // generate key and iv 245 if (cipherKeyBits % 8 != 0) { 246 throw new IllegalArgumentException("The AES cipher key size in bits" + 247 " should be a multiple of byte"); 248 } 249 int len = cipherKeyBits / 8; 250 byte[] inKey = new byte[len]; 251 byte[] outKey = new byte[len]; 252 byte[] inIv = new byte[len]; 253 byte[] outIv = new byte[len]; 254 255 try { 256 // generate the cipher meta data with SecureRandom 257 CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties); 258 secureRandom.nextBytes(inKey); 259 secureRandom.nextBytes(outKey); 260 secureRandom.nextBytes(inIv); 261 secureRandom.nextBytes(outIv); 262 263 // create CryptoAES for server 264 cryptoAES = new CryptoAES(transformation, properties, 265 inKey, outKey, inIv, outIv); 266 // create SaslCipherMeta and send to client, 267 // for client, the [inKey, outKey], [inIv, outIv] should be reversed 268 RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder(); 269 ccmBuilder.setTransformation(transformation); 270 ccmBuilder.setInIv(getByteString(outIv)); 271 ccmBuilder.setInKey(getByteString(outKey)); 272 ccmBuilder.setOutIv(getByteString(inIv)); 273 ccmBuilder.setOutKey(getByteString(inKey)); 274 chrBuilder.setCryptoCipherMeta(ccmBuilder); 275 useCryptoAesWrap = true; 276 } catch (GeneralSecurityException | IOException ex) { 277 throw new UnsupportedCryptoException(ex.getMessage(), ex); 278 } 279 } 280 281 private ByteString getByteString(byte[] bytes) { 282 // return singleton to reduce object allocation 283 return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes); 284 } 285 286 private UserGroupInformation createUser(ConnectionHeader head) { 287 UserGroupInformation ugi = null; 288 289 if (!head.hasUserInfo()) { 290 return null; 291 } 292 UserInformation userInfoProto = head.getUserInfo(); 293 String effectiveUser = null; 294 if (userInfoProto.hasEffectiveUser()) { 295 effectiveUser = userInfoProto.getEffectiveUser(); 296 } 297 String realUser = null; 298 if (userInfoProto.hasRealUser()) { 299 realUser = userInfoProto.getRealUser(); 300 } 301 if (effectiveUser != null) { 302 if (realUser != null) { 303 UserGroupInformation realUserUgi = 304 UserGroupInformation.createRemoteUser(realUser); 305 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi); 306 } else { 307 ugi = UserGroupInformation.createRemoteUser(effectiveUser); 308 } 309 } 310 return ugi; 311 } 312 313 protected final void disposeSasl() { 314 if (saslServer != null) { 315 saslServer.dispose(); 316 saslServer = null; 317 } 318 } 319 320 /** 321 * No protobuf encoding of raw sasl messages 322 */ 323 protected final void doRawSaslReply(SaslStatus status, Writable rv, 324 String errorClass, String error) throws IOException { 325 BufferChain bc; 326 // In my testing, have noticed that sasl messages are usually 327 // in the ballpark of 100-200. That's why the initial capacity is 256. 328 try (ByteBufferOutputStream saslResponse = new ByteBufferOutputStream(256); 329 DataOutputStream out = new DataOutputStream(saslResponse)) { 330 out.writeInt(status.state); // write status 331 if (status == SaslStatus.SUCCESS) { 332 rv.write(out); 333 } else { 334 WritableUtils.writeString(out, errorClass); 335 WritableUtils.writeString(out, error); 336 } 337 bc = new BufferChain(saslResponse.getByteBuffer()); 338 } 339 doRespond(() -> bc); 340 } 341 342 public void saslReadAndProcess(ByteBuff saslToken) throws IOException, 343 InterruptedException { 344 if (saslContextEstablished) { 345 RpcServer.LOG.trace("Read input token of size={} for processing by saslServer.unwrap()", 346 saslToken.limit()); 347 if (!useWrap) { 348 processOneRpc(saslToken); 349 } else { 350 byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes(); 351 byte [] plaintextData; 352 if (useCryptoAesWrap) { 353 // unwrap with CryptoAES 354 plaintextData = cryptoAES.unwrap(b, 0, b.length); 355 } else { 356 plaintextData = saslServer.unwrap(b, 0, b.length); 357 } 358 processUnwrappedData(plaintextData); 359 } 360 } else { 361 byte[] replyToken; 362 try { 363 if (saslServer == null) { 364 saslServer = 365 new HBaseSaslRpcServer(authMethod, rpcServer.saslProps, rpcServer.secretManager); 366 RpcServer.LOG.debug("Created SASL server with mechanism={}", 367 authMethod.getMechanismName()); 368 } 369 RpcServer.LOG.debug("Read input token of size={} for processing by saslServer." + 370 "evaluateResponse()", saslToken.limit()); 371 replyToken = saslServer.evaluateResponse(saslToken.hasArray()? 372 saslToken.array() : saslToken.toBytes()); 373 } catch (IOException e) { 374 IOException sendToClient = e; 375 Throwable cause = e; 376 while (cause != null) { 377 if (cause instanceof InvalidToken) { 378 sendToClient = (InvalidToken) cause; 379 break; 380 } 381 cause = cause.getCause(); 382 } 383 doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(), 384 sendToClient.getLocalizedMessage()); 385 this.rpcServer.metrics.authenticationFailure(); 386 String clientIP = this.toString(); 387 // attempting user could be null 388 RpcServer.AUDITLOG 389 .warn(RpcServer.AUTH_FAILED_FOR + clientIP + ":" + saslServer.getAttemptingUser()); 390 throw e; 391 } 392 if (replyToken != null) { 393 if (RpcServer.LOG.isDebugEnabled()) { 394 RpcServer.LOG.debug("Will send token of size " + replyToken.length 395 + " from saslServer."); 396 } 397 doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null, 398 null); 399 } 400 if (saslServer.isComplete()) { 401 String qop = saslServer.getNegotiatedQop(); 402 useWrap = qop != null && !"auth".equalsIgnoreCase(qop); 403 ugi = getAuthorizedUgi(saslServer.getAuthorizationID()); 404 if (RpcServer.LOG.isDebugEnabled()) { 405 RpcServer.LOG.debug("SASL server context established. Authenticated client: " + ugi + 406 ". Negotiated QoP is " + qop); 407 } 408 this.rpcServer.metrics.authenticationSuccess(); 409 RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); 410 saslContextEstablished = true; 411 } 412 } 413 } 414 415 private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException { 416 ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf)); 417 // Read all RPCs contained in the inBuf, even partial ones 418 while (true) { 419 int count; 420 if (unwrappedDataLengthBuffer.remaining() > 0) { 421 count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer); 422 if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) 423 return; 424 } 425 426 if (unwrappedData == null) { 427 unwrappedDataLengthBuffer.flip(); 428 int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); 429 430 if (unwrappedDataLength == RpcClient.PING_CALL_ID) { 431 if (RpcServer.LOG.isDebugEnabled()) 432 RpcServer.LOG.debug("Received ping message"); 433 unwrappedDataLengthBuffer.clear(); 434 continue; // ping message 435 } 436 unwrappedData = ByteBuffer.allocate(unwrappedDataLength); 437 } 438 439 count = this.rpcServer.channelRead(ch, unwrappedData); 440 if (count <= 0 || unwrappedData.remaining() > 0) 441 return; 442 443 if (unwrappedData.remaining() == 0) { 444 unwrappedDataLengthBuffer.clear(); 445 unwrappedData.flip(); 446 processOneRpc(new SingleByteBuff(unwrappedData)); 447 unwrappedData = null; 448 } 449 } 450 } 451 452 public void processOneRpc(ByteBuff buf) throws IOException, 453 InterruptedException { 454 if (connectionHeaderRead) { 455 processRequest(buf); 456 } else { 457 processConnectionHeader(buf); 458 this.connectionHeaderRead = true; 459 if (!authorizeConnection()) { 460 // Throw FatalConnectionException wrapping ACE so client does right thing and closes 461 // down the connection instead of trying to read non-existent retun. 462 throw new AccessDeniedException("Connection from " + this + " for service " + 463 connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); 464 } 465 this.user = this.rpcServer.userProvider.create(this.ugi); 466 } 467 } 468 469 private boolean authorizeConnection() throws IOException { 470 try { 471 // If auth method is DIGEST, the token was obtained by the 472 // real user for the effective user, therefore not required to 473 // authorize real user. doAs is allowed only for simple or kerberos 474 // authentication 475 if (ugi != null && ugi.getRealUser() != null 476 && (authMethod != AuthMethod.DIGEST)) { 477 ProxyUsers.authorize(ugi, this.getHostAddress(), this.rpcServer.conf); 478 } 479 this.rpcServer.authorize(ugi, connectionHeader, getHostInetAddress()); 480 this.rpcServer.metrics.authorizationSuccess(); 481 } catch (AuthorizationException ae) { 482 if (RpcServer.LOG.isDebugEnabled()) { 483 RpcServer.LOG.debug("Connection authorization failed: " + ae.getMessage(), ae); 484 } 485 this.rpcServer.metrics.authorizationFailure(); 486 doRespond(getErrorResponse(ae.getMessage(), new AccessDeniedException(ae))); 487 return false; 488 } 489 return true; 490 } 491 492 // Reads the connection header following version 493 private void processConnectionHeader(ByteBuff buf) throws IOException { 494 if (buf.hasArray()) { 495 this.connectionHeader = ConnectionHeader.parseFrom(buf.array()); 496 } else { 497 CodedInputStream cis = UnsafeByteOperations.unsafeWrap( 498 new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput(); 499 cis.enableAliasing(true); 500 this.connectionHeader = ConnectionHeader.parseFrom(cis); 501 } 502 String serviceName = connectionHeader.getServiceName(); 503 if (serviceName == null) throw new EmptyServiceNameException(); 504 this.service = RpcServer.getService(this.rpcServer.services, serviceName); 505 if (this.service == null) throw new UnknownServiceException(serviceName); 506 setupCellBlockCodecs(this.connectionHeader); 507 RPCProtos.ConnectionHeaderResponse.Builder chrBuilder = 508 RPCProtos.ConnectionHeaderResponse.newBuilder(); 509 setupCryptoCipher(this.connectionHeader, chrBuilder); 510 responseConnectionHeader(chrBuilder); 511 UserGroupInformation protocolUser = createUser(connectionHeader); 512 if (!useSasl) { 513 ugi = protocolUser; 514 if (ugi != null) { 515 ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod); 516 } 517 // audit logging for SASL authenticated users happens in saslReadAndProcess() 518 if (authenticatedWithFallback) { 519 RpcServer.LOG.warn("Allowed fallback to SIMPLE auth for {} connecting from {}", 520 ugi, getHostAddress()); 521 } 522 } else { 523 // user is authenticated 524 ugi.setAuthenticationMethod(authMethod.authenticationMethod); 525 //Now we check if this is a proxy user case. If the protocol user is 526 //different from the 'user', it is a proxy user scenario. However, 527 //this is not allowed if user authenticated with DIGEST. 528 if ((protocolUser != null) 529 && (!protocolUser.getUserName().equals(ugi.getUserName()))) { 530 if (authMethod == AuthMethod.DIGEST) { 531 // Not allowed to doAs if token authentication is used 532 throw new AccessDeniedException("Authenticated user (" + ugi 533 + ") doesn't match what the client claims to be (" 534 + protocolUser + ")"); 535 } else { 536 // Effective user can be different from authenticated user 537 // for simple auth or kerberos auth 538 // The user is the real user. Now we create a proxy user 539 UserGroupInformation realUser = ugi; 540 ugi = UserGroupInformation.createProxyUser(protocolUser 541 .getUserName(), realUser); 542 // Now the user is a proxy user, set Authentication method Proxy. 543 ugi.setAuthenticationMethod(AuthenticationMethod.PROXY); 544 } 545 } 546 } 547 String version; 548 if (this.connectionHeader.hasVersionInfo()) { 549 // see if this connection will support RetryImmediatelyException 550 this.retryImmediatelySupported = 551 VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2); 552 version = this.connectionHeader.getVersionInfo().getVersion(); 553 } else { 554 version = "UNKNOWN"; 555 } 556 RpcServer.AUDITLOG.info("Connection from {}:{}, version={}, sasl={}, ugi={}, service={}", 557 this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName); 558 } 559 560 /** 561 * Send the response for connection header 562 */ 563 private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder) 564 throws FatalConnectionException { 565 // Response the connection header if Crypto AES is enabled 566 if (!chrBuilder.hasCryptoCipherMeta()) return; 567 try { 568 byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray(); 569 // encrypt the Crypto AES cipher meta data with sasl server, and send to client 570 byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4]; 571 Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4); 572 Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length); 573 byte[] wrapped = saslServer.wrap(unwrapped, 0, unwrapped.length); 574 BufferChain bc; 575 try (ByteBufferOutputStream response = new ByteBufferOutputStream(wrapped.length + 4); 576 DataOutputStream out = new DataOutputStream(response)) { 577 out.writeInt(wrapped.length); 578 out.write(wrapped); 579 bc = new BufferChain(response.getByteBuffer()); 580 } 581 doRespond(() -> bc); 582 } catch (IOException ex) { 583 throw new UnsupportedCryptoException(ex.getMessage(), ex); 584 } 585 } 586 587 protected abstract void doRespond(RpcResponse resp) throws IOException; 588 589 /** 590 * @param buf 591 * Has the request header and the request param and optionally 592 * encoded data buffer all in this one array. 593 * @throws IOException 594 * @throws InterruptedException 595 */ 596 protected void processRequest(ByteBuff buf) throws IOException, 597 InterruptedException { 598 long totalRequestSize = buf.limit(); 599 int offset = 0; 600 // Here we read in the header. We avoid having pb 601 // do its default 4k allocation for CodedInputStream. We force it to use 602 // backing array. 603 CodedInputStream cis; 604 if (buf.hasArray()) { 605 cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit()).newCodedInput(); 606 } else { 607 cis = UnsafeByteOperations 608 .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput(); 609 } 610 cis.enableAliasing(true); 611 int headerSize = cis.readRawVarint32(); 612 offset = cis.getTotalBytesRead(); 613 Message.Builder builder = RequestHeader.newBuilder(); 614 ProtobufUtil.mergeFrom(builder, cis, headerSize); 615 RequestHeader header = (RequestHeader) builder.build(); 616 offset += headerSize; 617 int id = header.getCallId(); 618 if (RpcServer.LOG.isTraceEnabled()) { 619 RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) 620 + " totalRequestSize: " + totalRequestSize + " bytes"); 621 } 622 // Enforcing the call queue size, this triggers a retry in the client 623 // This is a bit late to be doing this check - we have already read in the 624 // total request. 625 if ((totalRequestSize + 626 this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) { 627 final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null, 628 totalRequestSize, null, 0, this.callCleanup); 629 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 630 callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, 631 "Call queue is full on " + this.rpcServer.server.getServerName() + 632 ", is hbase.ipc.server.max.callqueue.size too small?"); 633 callTooBig.sendResponseIfReady(); 634 return; 635 } 636 MethodDescriptor md = null; 637 Message param = null; 638 CellScanner cellScanner = null; 639 try { 640 if (header.hasRequestParam() && header.getRequestParam()) { 641 md = this.service.getDescriptorForType().findMethodByName( 642 header.getMethodName()); 643 if (md == null) 644 throw new UnsupportedOperationException(header.getMethodName()); 645 builder = this.service.getRequestPrototype(md).newBuilderForType(); 646 cis.resetSizeCounter(); 647 int paramSize = cis.readRawVarint32(); 648 offset += cis.getTotalBytesRead(); 649 if (builder != null) { 650 ProtobufUtil.mergeFrom(builder, cis, paramSize); 651 param = builder.build(); 652 } 653 offset += paramSize; 654 } else { 655 // currently header must have request param, so we directly throw 656 // exception here 657 String msg = "Invalid request header: " 658 + TextFormat.shortDebugString(header) 659 + ", should have param set in it"; 660 RpcServer.LOG.warn(msg); 661 throw new DoNotRetryIOException(msg); 662 } 663 if (header.hasCellBlockMeta()) { 664 buf.position(offset); 665 ByteBuff dup = buf.duplicate(); 666 dup.limit(offset + header.getCellBlockMeta().getLength()); 667 cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers( 668 this.codec, this.compressionCodec, dup); 669 } 670 } catch (Throwable t) { 671 InetSocketAddress address = this.rpcServer.getListenerAddress(); 672 String msg = (address != null ? address : "(channel closed)") 673 + " is unable to read call parameter from client " 674 + getHostAddress(); 675 RpcServer.LOG.warn(msg, t); 676 677 this.rpcServer.metrics.exception(t); 678 679 // probably the hbase hadoop version does not match the running hadoop 680 // version 681 if (t instanceof LinkageError) { 682 t = new DoNotRetryIOException(t); 683 } 684 // If the method is not present on the server, do not retry. 685 if (t instanceof UnsupportedOperationException) { 686 t = new DoNotRetryIOException(t); 687 } 688 689 ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null, 690 totalRequestSize, null, 0, this.callCleanup); 691 readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage()); 692 readParamsFailedCall.sendResponseIfReady(); 693 return; 694 } 695 696 int timeout = 0; 697 if (header.hasTimeout() && header.getTimeout() > 0) { 698 timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); 699 } 700 ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, totalRequestSize, 701 this.addr, timeout, this.callCleanup); 702 703 if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { 704 this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); 705 this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); 706 call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, 707 "Call queue is full on " + this.rpcServer.server.getServerName() + 708 ", too many items queued ?"); 709 call.sendResponseIfReady(); 710 } 711 } 712 713 protected final RpcResponse getErrorResponse(String msg, Exception e) throws IOException { 714 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder().setCallId(-1); 715 ServerCall.setExceptionResponse(e, msg, headerBuilder); 716 ByteBuffer headerBuf = 717 ServerCall.createHeaderAndMessageBytes(null, headerBuilder.build(), 0, null); 718 BufferChain buf = new BufferChain(headerBuf); 719 return () -> buf; 720 } 721 722 private void doBadPreambleHandling(String msg) throws IOException { 723 doBadPreambleHandling(msg, new FatalConnectionException(msg)); 724 } 725 726 private void doBadPreambleHandling(String msg, Exception e) throws IOException { 727 SimpleRpcServer.LOG.warn(msg); 728 doRespond(getErrorResponse(msg, e)); 729 } 730 731 protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOException { 732 assert preambleBuffer.remaining() == 6; 733 for (int i = 0; i < RPC_HEADER.length; i++) { 734 if (RPC_HEADER[i] != preambleBuffer.get()) { 735 doBadPreambleHandling( 736 "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" + 737 Bytes.toStringBinary(preambleBuffer.array(), 0, RPC_HEADER.length) + " from " + 738 toString()); 739 return false; 740 } 741 } 742 int version = preambleBuffer.get() & 0xFF; 743 byte authbyte = preambleBuffer.get(); 744 this.authMethod = AuthMethod.valueOf(authbyte); 745 if (version != SimpleRpcServer.CURRENT_VERSION) { 746 String msg = getFatalConnectionString(version, authbyte); 747 doBadPreambleHandling(msg, new WrongVersionException(msg)); 748 return false; 749 } 750 if (authMethod == null) { 751 String msg = getFatalConnectionString(version, authbyte); 752 doBadPreambleHandling(msg, new BadAuthException(msg)); 753 return false; 754 } 755 if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) { 756 if (this.rpcServer.allowFallbackToSimpleAuth) { 757 this.rpcServer.metrics.authenticationFallback(); 758 authenticatedWithFallback = true; 759 } else { 760 AccessDeniedException ae = new AccessDeniedException("Authentication is required"); 761 doRespond(getErrorResponse(ae.getMessage(), ae)); 762 return false; 763 } 764 } 765 if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) { 766 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, 767 null); 768 authMethod = AuthMethod.SIMPLE; 769 // client has already sent the initial Sasl message and we 770 // should ignore it. Both client and server should fall back 771 // to simple auth from now on. 772 skipInitialSaslHandshake = true; 773 } 774 if (authMethod != AuthMethod.SIMPLE) { 775 useSasl = true; 776 } 777 return true; 778 } 779 780 public abstract boolean isConnectionOpen(); 781 782 public abstract ServerCall<?> createCall(int id, BlockingService service, MethodDescriptor md, 783 RequestHeader header, Message param, CellScanner cellScanner, long size, 784 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup); 785 786 private static class ByteBuffByteInput extends ByteInput { 787 788 private ByteBuff buf; 789 private int offset; 790 private int length; 791 792 ByteBuffByteInput(ByteBuff buf, int offset, int length) { 793 this.buf = buf; 794 this.offset = offset; 795 this.length = length; 796 } 797 798 @Override 799 public byte read(int offset) { 800 return this.buf.get(getAbsoluteOffset(offset)); 801 } 802 803 private int getAbsoluteOffset(int offset) { 804 return this.offset + offset; 805 } 806 807 @Override 808 public int read(int offset, byte[] out, int outOffset, int len) { 809 this.buf.get(getAbsoluteOffset(offset), out, outOffset, len); 810 return len; 811 } 812 813 @Override 814 public int read(int offset, ByteBuffer out) { 815 int len = out.remaining(); 816 this.buf.get(out, getAbsoluteOffset(offset), len); 817 return len; 818 } 819 820 @Override 821 public int size() { 822 return this.length; 823 } 824 } 825}