001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader; 021import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.write; 023 024import io.opentelemetry.context.Scope; 025import java.io.BufferedInputStream; 026import java.io.BufferedOutputStream; 027import java.io.DataInputStream; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.io.InputStream; 031import java.io.InterruptedIOException; 032import java.io.OutputStream; 033import java.net.InetSocketAddress; 034import java.net.Socket; 035import java.net.SocketTimeoutException; 036import java.security.PrivilegedExceptionAction; 037import java.util.ArrayDeque; 038import java.util.Locale; 039import java.util.Queue; 040import java.util.Set; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.ThreadLocalRandom; 044import javax.security.sasl.SaslException; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.hbase.DoNotRetryIOException; 047import org.apache.hadoop.hbase.client.ConnectionUtils; 048import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 049import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 050import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; 051import org.apache.hadoop.hbase.log.HBaseMarkers; 052import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; 053import org.apache.hadoop.hbase.security.SaslUtil; 054import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 055import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.util.ExceptionUtil; 058import org.apache.hadoop.io.IOUtils; 059import org.apache.hadoop.ipc.RemoteException; 060import org.apache.hadoop.net.NetUtils; 061import org.apache.hadoop.security.UserGroupInformation; 062import org.apache.hadoop.util.StringUtils; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 068import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 069import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 070 071import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 075 076/** 077 * Thread that reads responses and notifies callers. Each connection owns a socket connected to a 078 * remote address. Calls are multiplexed through this socket: responses may be delivered out of 079 * order. 080 */ 081@InterfaceAudience.Private 082class BlockingRpcConnection extends RpcConnection implements Runnable { 083 084 private static final Logger LOG = LoggerFactory.getLogger(BlockingRpcConnection.class); 085 086 private final BlockingRpcClient rpcClient; 087 088 private final String threadName; 089 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 090 justification = "We are always under lock actually") 091 private Thread thread; 092 093 // connected socket. protected for writing UT. 094 protected Socket socket = null; 095 private DataInputStream in; 096 private DataOutputStream out; 097 098 private HBaseSaslRpcClient saslRpcClient; 099 100 // currently active calls 101 private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>(); 102 103 private final CallSender callSender; 104 105 private boolean closed = false; 106 107 private byte[] connectionHeaderPreamble; 108 109 private byte[] connectionHeaderWithLength; 110 111 private boolean waitingConnectionHeaderResponse = false; 112 113 /** 114 * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a 115 * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to 116 * use a different thread for writing. This way, on interruptions, we either cancel the writes or 117 * ignore the answer if the write is already done, but we don't stop the write in the middle. This 118 * adds a thread per region server in the client, so it's kept as an option. 119 * <p> 120 * The implementation is simple: the client threads adds their call to the queue, and then wait 121 * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On 122 * interruption, the client cancels its call. The CallSender checks that the call has not been 123 * canceled before writing it. 124 * </p> 125 * When the connection closes, all the calls not yet sent are dismissed. The client thread is 126 * notified with an appropriate exception, as if the call was already sent but the answer not yet 127 * received. 128 * </p> 129 */ 130 private class CallSender extends Thread { 131 132 private final Queue<Call> callsToWrite; 133 134 private final int maxQueueSize; 135 136 public CallSender(String name, Configuration conf) { 137 int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000); 138 callsToWrite = new ArrayDeque<>(queueSize); 139 this.maxQueueSize = queueSize; 140 setDaemon(true); 141 setName(name + " - writer"); 142 } 143 144 public void sendCall(final Call call) throws IOException { 145 if (callsToWrite.size() >= maxQueueSize) { 146 throw new IOException("Can't add " + call.toShortString() 147 + " to the write queue. callsToWrite.size()=" + callsToWrite.size()); 148 } 149 callsToWrite.offer(call); 150 BlockingRpcConnection.this.notifyAll(); 151 } 152 153 public void remove(Call call) { 154 callsToWrite.remove(call); 155 // By removing the call from the expected call list, we make the list smaller, but 156 // it means as well that we don't know how many calls we cancelled. 157 calls.remove(call.id); 158 call.setException(new CallCancelledException(call.toShortString() + ", waitTime=" 159 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 160 + call.timeout)); 161 } 162 163 /** 164 * Reads the call from the queue, write them on the socket. 165 */ 166 @Override 167 public void run() { 168 synchronized (BlockingRpcConnection.this) { 169 while (!closed) { 170 if (callsToWrite.isEmpty()) { 171 // We should use another monitor object here for better performance since the read 172 // thread also uses ConnectionImpl.this. But this makes the locking schema more 173 // complicated, can do it later as an optimization. 174 try { 175 BlockingRpcConnection.this.wait(); 176 } catch (InterruptedException e) { 177 // Restore interrupt status 178 Thread.currentThread().interrupt(); 179 } 180 // check if we need to quit, so continue the main loop instead of fallback. 181 continue; 182 } 183 Call call = callsToWrite.poll(); 184 if (call.isDone()) { 185 continue; 186 } 187 try (Scope scope = call.span.makeCurrent()) { 188 writeRequest(call); 189 } catch (IOException e) { 190 // exception here means the call has not been added to the pendingCalls yet, so we need 191 // to fail it by our own. 192 LOG.debug("call write error for {}", call.toShortString()); 193 call.setException(e); 194 closeConn(e); 195 } 196 } 197 } 198 } 199 200 /** 201 * Cleans the call not yet sent when we finish. 202 */ 203 public void cleanup(IOException e) { 204 IOException ie = 205 new ConnectionClosingException("Connection to " + remoteId.getAddress() + " is closing."); 206 for (Call call : callsToWrite) { 207 call.setException(ie); 208 } 209 callsToWrite.clear(); 210 } 211 } 212 213 BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { 214 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, 215 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.providers, rpcClient.codec, 216 rpcClient.compressor, rpcClient.cellBlockBuilder, rpcClient.metrics, 217 rpcClient.connectionAttributes); 218 this.rpcClient = rpcClient; 219 this.connectionHeaderPreamble = getConnectionHeaderPreamble(); 220 ConnectionHeader header = getConnectionHeader(); 221 ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize()); 222 DataOutputStream dos = new DataOutputStream(baos); 223 dos.writeInt(header.getSerializedSize()); 224 header.writeTo(dos); 225 assert baos.size() == 4 + header.getSerializedSize(); 226 this.connectionHeaderWithLength = baos.getBuffer(); 227 228 UserGroupInformation ticket = remoteId.ticket.getUGI(); 229 this.threadName = "BRPC Connection (" + this.rpcClient.socketFactory.hashCode() + ") to " 230 + remoteId.getAddress().toString() 231 + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName())); 232 233 if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) { 234 callSender = new CallSender(threadName, this.rpcClient.conf); 235 callSender.start(); 236 } else { 237 callSender = null; 238 } 239 } 240 241 // protected for write UT. 242 protected void setupConnection() throws IOException { 243 short ioFailures = 0; 244 short timeoutFailures = 0; 245 while (true) { 246 try { 247 this.socket = this.rpcClient.socketFactory.createSocket(); 248 this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay()); 249 this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive); 250 if (this.rpcClient.localAddr != null) { 251 this.socket.bind(this.rpcClient.localAddr); 252 } 253 InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); 254 NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO); 255 this.socket.setSoTimeout(this.rpcClient.readTO); 256 return; 257 } catch (SocketTimeoutException toe) { 258 /* 259 * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries. 260 */ 261 if (LOG.isDebugEnabled()) { 262 LOG.debug( 263 "Received exception in connection setup.\n" + StringUtils.stringifyException(toe)); 264 } 265 handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe); 266 } catch (IOException ie) { 267 if (LOG.isDebugEnabled()) { 268 LOG.debug( 269 "Received exception in connection setup.\n" + StringUtils.stringifyException(ie)); 270 } 271 handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie); 272 } 273 } 274 } 275 276 /** 277 * Handle connection failures If the current number of retries is equal to the max number of 278 * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting 279 * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence 280 * the sleep is synchronized; the locks will be retained. 281 * @param curRetries current number of retries 282 * @param maxRetries max number of retries allowed 283 * @param ioe failure reason 284 * @throws IOException if max number of retries is reached 285 */ 286 private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) 287 throws IOException { 288 closeSocket(); 289 290 // throw the exception if the maximum number of retries is reached 291 if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) { 292 throw ioe; 293 } 294 295 // otherwise back off and retry 296 try { 297 Thread.sleep(this.rpcClient.failureSleep); 298 } catch (InterruptedException ie) { 299 ExceptionUtil.rethrowIfInterrupt(ie); 300 } 301 302 if (LOG.isInfoEnabled()) { 303 LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping " 304 + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s)."); 305 } 306 } 307 308 /* 309 * wait till someone signals us to start reading RPC response or it is idle too long, it is marked 310 * as to be closed, or the client is marked as not running. 311 * @return true if it is time to read a response; false otherwise. 312 */ 313 private synchronized boolean waitForWork() { 314 // beware of the concurrent access to the calls list: we can add calls, but as well 315 // remove them. 316 long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose; 317 for (;;) { 318 if (thread == null) { 319 return false; 320 } 321 if (!calls.isEmpty()) { 322 return true; 323 } 324 if (EnvironmentEdgeManager.currentTime() >= waitUntil) { 325 closeConn( 326 new IOException("idle connection closed with " + calls.size() + " pending request(s)")); 327 return false; 328 } 329 try { 330 wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000)); 331 } catch (InterruptedException e) { 332 // Restore interrupt status 333 Thread.currentThread().interrupt(); 334 } 335 } 336 } 337 338 @Override 339 public void run() { 340 if (LOG.isTraceEnabled()) { 341 LOG.trace(threadName + ": starting"); 342 } 343 while (waitForWork()) { 344 readResponse(); 345 } 346 if (LOG.isTraceEnabled()) { 347 LOG.trace(threadName + ": stopped"); 348 } 349 } 350 351 private void disposeSasl() { 352 if (saslRpcClient != null) { 353 saslRpcClient.dispose(); 354 saslRpcClient = null; 355 } 356 } 357 358 private boolean setupSaslConnection(final InputStream in2, final OutputStream out2, 359 String serverPrincipal) throws IOException { 360 if (this.metrics != null) { 361 this.metrics.incrNsLookups(); 362 } 363 saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, 364 socket.getInetAddress(), serverPrincipal, this.rpcClient.fallbackAllowed, 365 this.rpcClient.conf.get("hbase.rpc.protection", 366 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), 367 this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT)); 368 return saslRpcClient.saslConnect(in2, out2); 369 } 370 371 /** 372 * If multiple clients with the same principal try to connect to the same server at the same time, 373 * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to 374 * work around this, what is done is that the client backs off randomly and tries to initiate the 375 * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is 376 * attempted. 377 * <p> 378 * The retry logic is governed by the {@link SaslClientAuthenticationProvider#canRetry()} method. 379 * Some providers have the ability to obtain new credentials and then re-attempt to authenticate 380 * with HBase services. Other providers will continue to fail if they failed the first time -- for 381 * those, we want to fail-fast. 382 * </p> 383 */ 384 private void handleSaslConnectionFailure(final int currRetries, final int maxRetries, 385 final Exception ex, final UserGroupInformation user, final String serverPrincipal) 386 throws IOException, InterruptedException { 387 closeSocket(); 388 user.doAs(new PrivilegedExceptionAction<Object>() { 389 @Override 390 public Object run() throws IOException, InterruptedException { 391 // A provider which failed authentication, but doesn't have the ability to relogin with 392 // some external system (e.g. username/password, the password either works or it doesn't) 393 if (!provider.canRetry()) { 394 LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(), 395 ex); 396 if (ex instanceof RemoteException) { 397 throw (RemoteException) ex; 398 } 399 if (ex instanceof SaslException) { 400 String msg = "SASL authentication failed." 401 + " The most likely cause is missing or invalid credentials."; 402 throw new RuntimeException(msg, ex); 403 } 404 throw new IOException(ex); 405 } 406 407 // Other providers, like kerberos, could request a new ticket from a keytab. Let 408 // them try again. 409 if (currRetries < maxRetries) { 410 LOG.debug("Exception encountered while connecting to the server " + remoteId.getAddress(), 411 ex); 412 413 // Invoke the provider to perform the relogin 414 provider.relogin(); 415 416 // Get rid of any old state on the SaslClient 417 disposeSasl(); 418 419 // have granularity of milliseconds 420 // we are sleeping with the Connection lock held but since this 421 // connection instance is being used for connecting to the server 422 // in question, it is okay 423 Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1); 424 return null; 425 } else { 426 String msg = "Failed to initiate connection for " 427 + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; 428 throw new IOException(msg, ex); 429 } 430 } 431 }); 432 } 433 434 private void getConnectionRegistry(InputStream inStream, OutputStream outStream, 435 Call connectionRegistryCall) throws IOException { 436 outStream.write(RpcClient.REGISTRY_PREAMBLE_HEADER); 437 readResponse(new DataInputStream(inStream), calls, connectionRegistryCall, remoteExc -> { 438 synchronized (this) { 439 closeConn(remoteExc); 440 } 441 }); 442 } 443 444 private void createStreams(InputStream inStream, OutputStream outStream) { 445 this.in = new DataInputStream(new BufferedInputStream(inStream)); 446 this.out = new DataOutputStream(new BufferedOutputStream(outStream)); 447 } 448 449 // choose the server principal to use 450 private String chooseServerPrincipal(InputStream inStream, OutputStream outStream) 451 throws IOException { 452 Set<String> serverPrincipals = getServerPrincipals(); 453 if (serverPrincipals.size() == 1) { 454 return serverPrincipals.iterator().next(); 455 } 456 // this means we use kerberos authentication and there are multiple server principal candidates, 457 // in this way we need to send a special preamble header to get server principal from server 458 Call securityPreambleCall = createSecurityPreambleCall(r -> { 459 }); 460 outStream.write(RpcClient.SECURITY_PREAMBLE_HEADER); 461 readResponse(new DataInputStream(inStream), calls, securityPreambleCall, remoteExc -> { 462 synchronized (this) { 463 closeConn(remoteExc); 464 } 465 }); 466 if (securityPreambleCall.error != null) { 467 LOG.debug("Error when trying to do a security preamble call to {}", remoteId.address, 468 securityPreambleCall.error); 469 if (ConnectionUtils.isUnexpectedPreambleHeaderException(securityPreambleCall.error)) { 470 // this means we are connecting to an old server which does not support the security 471 // preamble call, so we should fallback to randomly select a principal to use 472 // TODO: find a way to reconnect without failing all the pending calls, for now, when we 473 // reach here, shutdown should have already been scheduled 474 throw securityPreambleCall.error; 475 } 476 if (IPCUtil.isSecurityNotEnabledException(securityPreambleCall.error)) { 477 // server tells us security is not enabled, then we should check whether fallback to 478 // simple is allowed, if so we just go without security, otherwise we should fail the 479 // negotiation immediately 480 if (rpcClient.fallbackAllowed) { 481 // TODO: just change the preamble and skip the fallback to simple logic, for now, just 482 // select the first principal can finish the connection setup, but waste one client 483 // message 484 return serverPrincipals.iterator().next(); 485 } else { 486 throw new FallbackDisallowedException(); 487 } 488 } 489 return randomSelect(serverPrincipals); 490 } 491 return chooseServerPrincipal(serverPrincipals, securityPreambleCall); 492 } 493 494 private void setupIOstreams(Call connectionRegistryCall) throws IOException { 495 if (socket != null) { 496 // The connection is already available. Perfect. 497 return; 498 } 499 500 if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) { 501 if (LOG.isDebugEnabled()) { 502 LOG.debug("Not trying to connect to " + remoteId.getAddress() 503 + " this server is in the failed servers list"); 504 } 505 throw new FailedServerException( 506 "This server is in the failed servers list: " + remoteId.getAddress()); 507 } 508 509 try { 510 if (LOG.isDebugEnabled()) { 511 LOG.debug("Connecting to " + remoteId.getAddress()); 512 } 513 514 short numRetries = 0; 515 int reloginMaxRetries = this.rpcClient.conf.getInt("hbase.security.relogin.maxretries", 5); 516 while (true) { 517 setupConnection(); 518 InputStream inStream = NetUtils.getInputStream(socket); 519 // This creates a socket with a write timeout. This timeout cannot be changed. 520 OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); 521 if (connectionRegistryCall != null) { 522 getConnectionRegistry(inStream, outStream, connectionRegistryCall); 523 closeSocket(); 524 return; 525 } 526 527 if (useSasl) { 528 UserGroupInformation ticket = provider.getRealUser(remoteId.ticket); 529 boolean continueSasl; 530 if (ticket == null) { 531 throw new FatalConnectionException("ticket/user is null"); 532 } 533 String serverPrincipal = chooseServerPrincipal(inStream, outStream); 534 // Write out the preamble -- MAGIC, version, and auth to use. 535 writeConnectionHeaderPreamble(outStream); 536 try { 537 final InputStream in2 = inStream; 538 final OutputStream out2 = outStream; 539 continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() { 540 @Override 541 public Boolean run() throws IOException { 542 return setupSaslConnection(in2, out2, serverPrincipal); 543 } 544 }); 545 } catch (Exception ex) { 546 ExceptionUtil.rethrowIfInterrupt(ex); 547 saslNegotiationDone(serverPrincipal, false); 548 handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, ticket, 549 serverPrincipal); 550 continue; 551 } 552 saslNegotiationDone(serverPrincipal, true); 553 if (continueSasl) { 554 // Sasl connect is successful. Let's set up Sasl i/o streams. 555 inStream = saslRpcClient.getInputStream(); 556 outStream = saslRpcClient.getOutputStream(); 557 } else { 558 // fall back to simple auth because server told us so. 559 // do not change authMethod and useSasl here, we should start from secure when 560 // reconnecting because regionserver may change its sasl config after restart. 561 saslRpcClient = null; 562 } 563 } else { 564 // Write out the preamble -- MAGIC, version, and auth to use. 565 writeConnectionHeaderPreamble(outStream); 566 } 567 createStreams(inStream, outStream); 568 // Now write out the connection header 569 writeConnectionHeader(); 570 // process the response from server for connection header if necessary 571 processResponseForConnectionHeader(); 572 break; 573 } 574 } catch (Throwable t) { 575 closeSocket(); 576 IOException e = ExceptionUtil.asInterrupt(t); 577 if (e == null) { 578 this.rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), t); 579 if (t instanceof LinkageError) { 580 // probably the hbase hadoop version does not match the running hadoop version 581 e = new DoNotRetryIOException(t); 582 } else if (t instanceof IOException) { 583 e = (IOException) t; 584 } else { 585 e = new IOException("Could not set up IO Streams to " + remoteId.getAddress(), t); 586 } 587 } 588 throw e; 589 } 590 591 // start the receiver thread after the socket connection has been set up 592 thread = new Thread(this, threadName); 593 thread.setDaemon(true); 594 thread.start(); 595 } 596 597 /** 598 * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>} 599 */ 600 private void writeConnectionHeaderPreamble(OutputStream out) throws IOException { 601 out.write(connectionHeaderPreamble); 602 out.flush(); 603 } 604 605 /** 606 * Write the connection header. 607 */ 608 private void writeConnectionHeader() throws IOException { 609 boolean isCryptoAesEnable = false; 610 // check if Crypto AES is enabled 611 if (saslRpcClient != null) { 612 boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop() 613 .equalsIgnoreCase(saslRpcClient.getSaslQOP()); 614 isCryptoAesEnable = saslEncryptionEnabled 615 && conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT); 616 } 617 618 // if Crypto AES is enabled, set transformation and negotiate with server 619 if (isCryptoAesEnable) { 620 waitingConnectionHeaderResponse = true; 621 } 622 this.out.write(connectionHeaderWithLength); 623 this.out.flush(); 624 } 625 626 private void processResponseForConnectionHeader() throws IOException { 627 // if no response excepted, return 628 if (!waitingConnectionHeaderResponse) return; 629 try { 630 // read the ConnectionHeaderResponse from server 631 int len = this.in.readInt(); 632 byte[] buff = new byte[len]; 633 int readSize = this.in.read(buff); 634 if (LOG.isDebugEnabled()) { 635 LOG.debug("Length of response for connection header:" + readSize); 636 } 637 638 RPCProtos.ConnectionHeaderResponse connectionHeaderResponse = 639 RPCProtos.ConnectionHeaderResponse.parseFrom(buff); 640 641 // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher 642 if (connectionHeaderResponse.hasCryptoCipherMeta()) { 643 negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta()); 644 } 645 waitingConnectionHeaderResponse = false; 646 } catch (SocketTimeoutException ste) { 647 LOG.error(HBaseMarkers.FATAL, 648 "Can't get the connection header response for rpc timeout, " 649 + "please check if server has the correct configuration to support the additional " 650 + "function.", 651 ste); 652 // timeout when waiting the connection header response, ignore the additional function 653 throw new IOException("Timeout while waiting connection header response", ste); 654 } 655 } 656 657 private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta) throws IOException { 658 // initialize the Crypto AES with CryptoCipherMeta 659 saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf); 660 // reset the inputStream/outputStream for Crypto AES encryption 661 this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream())); 662 this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream())); 663 } 664 665 /** 666 * Initiates a call by sending the parameter to the remote server. Note: this is not called from 667 * the Connection thread, but by other threads. 668 * @see #readResponse() 669 */ 670 private void writeRequest(Call call) throws IOException { 671 ByteBuf cellBlock = null; 672 try { 673 cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, 674 call.cells, PooledByteBufAllocator.DEFAULT); 675 CellBlockMeta cellBlockMeta; 676 if (cellBlock != null) { 677 cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build(); 678 } else { 679 cellBlockMeta = null; 680 } 681 RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); 682 if (call.isConnectionRegistryCall()) { 683 setupIOstreams(call); 684 return; 685 } 686 setupIOstreams(null); 687 688 // Now we're going to write the call. We take the lock, then check that the connection 689 // is still valid, and, if so we do the write to the socket. If the write fails, we don't 690 // know where we stand, we have to close the connection. 691 if (Thread.interrupted()) { 692 throw new InterruptedIOException(); 693 } 694 695 calls.put(call.id, call); // We put first as we don't want the connection to become idle. 696 // from here, we do not throw any exception to upper layer as the call has been tracked in 697 // the pending calls map. 698 try { 699 call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); 700 } catch (Throwable t) { 701 if (LOG.isTraceEnabled()) { 702 LOG.trace("Error while writing {}", call.toShortString()); 703 } 704 IOException e = IPCUtil.toIOE(t); 705 closeConn(e); 706 return; 707 } 708 } finally { 709 if (cellBlock != null) { 710 cellBlock.release(); 711 } 712 } 713 notifyAll(); 714 } 715 716 /* 717 * Receive a response. Because only one receiver, so no synchronization on in. 718 */ 719 private void readResponse() { 720 try { 721 readResponse(in, calls, null, remoteExc -> { 722 synchronized (this) { 723 closeConn(remoteExc); 724 } 725 }); 726 } catch (IOException e) { 727 if (e instanceof SocketTimeoutException) { 728 // Clean up open calls but don't treat this as a fatal condition, 729 // since we expect certain responses to not make it by the specified 730 // {@link ConnectionId#rpcTimeout}. 731 if (LOG.isTraceEnabled()) { 732 LOG.trace("ignored", e); 733 } 734 } else { 735 synchronized (this) { 736 closeConn(e); 737 } 738 } 739 } 740 } 741 742 @Override 743 protected synchronized void callTimeout(Call call) { 744 // call sender 745 calls.remove(call.id); 746 } 747 748 // just close socket input and output. 749 private void closeSocket() { 750 IOUtils.closeStream(out); 751 IOUtils.closeStream(in); 752 IOUtils.closeSocket(socket); 753 out = null; 754 in = null; 755 socket = null; 756 } 757 758 // close socket, reader, and clean up all pending calls. 759 private void closeConn(IOException e) { 760 if (thread == null) { 761 return; 762 } 763 thread.interrupt(); 764 thread = null; 765 closeSocket(); 766 if (callSender != null) { 767 callSender.cleanup(e); 768 } 769 for (Call call : calls.values()) { 770 call.setException(e); 771 } 772 calls.clear(); 773 } 774 775 // release all resources, the connection will not be used any more. 776 @Override 777 public synchronized void shutdown() { 778 closed = true; 779 if (callSender != null) { 780 callSender.interrupt(); 781 } 782 closeConn(new IOException("connection to " + remoteId.getAddress() + " closed")); 783 } 784 785 @Override 786 public void cleanupConnection() { 787 // do nothing 788 } 789 790 @Override 791 public synchronized void sendRequest(final Call call, HBaseRpcController pcrc) 792 throws IOException { 793 pcrc.notifyOnCancel(new RpcCallback<Object>() { 794 795 @Override 796 public void run(Object parameter) { 797 setCancelled(call); 798 synchronized (BlockingRpcConnection.this) { 799 if (callSender != null) { 800 callSender.remove(call); 801 } else { 802 calls.remove(call.id); 803 } 804 } 805 } 806 }, new CancellationCallback() { 807 808 @Override 809 public void run(boolean cancelled) throws IOException { 810 if (cancelled) { 811 setCancelled(call); 812 return; 813 } 814 scheduleTimeoutTask(call); 815 if (callSender != null) { 816 callSender.sendCall(call); 817 } else { 818 // this is in the same thread with the caller so do not need to attach the trace context 819 // again. 820 writeRequest(call); 821 } 822 } 823 }); 824 } 825 826 @Override 827 public synchronized boolean isActive() { 828 return thread != null; 829 } 830}