001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader; 021import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.write; 023 024import io.opentelemetry.context.Scope; 025import java.io.BufferedInputStream; 026import java.io.BufferedOutputStream; 027import java.io.DataInputStream; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.io.InputStream; 031import java.io.InterruptedIOException; 032import java.io.OutputStream; 033import java.net.InetSocketAddress; 034import java.net.Socket; 035import java.net.SocketTimeoutException; 036import java.security.PrivilegedExceptionAction; 037import java.util.ArrayDeque; 038import java.util.Locale; 039import java.util.Queue; 040import java.util.Set; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.ThreadLocalRandom; 044import javax.security.sasl.SaslException; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.hbase.DoNotRetryIOException; 047import org.apache.hadoop.hbase.client.ConnectionUtils; 048import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 049import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 050import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; 051import org.apache.hadoop.hbase.log.HBaseMarkers; 052import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; 053import org.apache.hadoop.hbase.security.SaslUtil; 054import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 055import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.util.ExceptionUtil; 058import org.apache.hadoop.io.IOUtils; 059import org.apache.hadoop.ipc.RemoteException; 060import org.apache.hadoop.net.NetUtils; 061import org.apache.hadoop.security.UserGroupInformation; 062import org.apache.hadoop.util.StringUtils; 063import org.apache.yetus.audience.InterfaceAudience; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 068import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 069import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 070 071import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 075 076/** 077 * Thread that reads responses and notifies callers. Each connection owns a socket connected to a 078 * remote address. Calls are multiplexed through this socket: responses may be delivered out of 079 * order. 080 */ 081@InterfaceAudience.Private 082class BlockingRpcConnection extends RpcConnection implements Runnable { 083 084 private static final Logger LOG = LoggerFactory.getLogger(BlockingRpcConnection.class); 085 086 private final BlockingRpcClient rpcClient; 087 088 private final String threadName; 089 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 090 justification = "We are always under lock actually") 091 private Thread thread; 092 093 // connected socket. protected for writing UT. 094 protected Socket socket = null; 095 private DataInputStream in; 096 private DataOutputStream out; 097 098 private HBaseSaslRpcClient saslRpcClient; 099 100 // currently active calls 101 private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>(); 102 103 private final CallSender callSender; 104 105 private boolean closed = false; 106 107 private byte[] connectionHeaderPreamble; 108 109 private byte[] connectionHeaderWithLength; 110 111 private boolean waitingConnectionHeaderResponse = false; 112 113 /** 114 * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a 115 * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to 116 * use a different thread for writing. This way, on interruptions, we either cancel the writes or 117 * ignore the answer if the write is already done, but we don't stop the write in the middle. This 118 * adds a thread per region server in the client, so it's kept as an option. 119 * <p> 120 * The implementation is simple: the client threads adds their call to the queue, and then wait 121 * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On 122 * interruption, the client cancels its call. The CallSender checks that the call has not been 123 * canceled before writing it. 124 * </p> 125 * When the connection closes, all the calls not yet sent are dismissed. The client thread is 126 * notified with an appropriate exception, as if the call was already sent but the answer not yet 127 * received. 128 * </p> 129 */ 130 private class CallSender extends Thread { 131 132 private final Queue<Call> callsToWrite; 133 134 private final int maxQueueSize; 135 136 public CallSender(String name, Configuration conf) { 137 int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000); 138 callsToWrite = new ArrayDeque<>(queueSize); 139 this.maxQueueSize = queueSize; 140 setDaemon(true); 141 setName(name + " - writer"); 142 } 143 144 public void sendCall(final Call call) throws IOException { 145 if (callsToWrite.size() >= maxQueueSize) { 146 throw new IOException("Can't add " + call.toShortString() 147 + " to the write queue. callsToWrite.size()=" + callsToWrite.size()); 148 } 149 callsToWrite.offer(call); 150 BlockingRpcConnection.this.notifyAll(); 151 } 152 153 public void remove(Call call) { 154 callsToWrite.remove(call); 155 // By removing the call from the expected call list, we make the list smaller, but 156 // it means as well that we don't know how many calls we cancelled. 157 calls.remove(call.id); 158 call.setException(new CallCancelledException(call.toShortString() + ", waitTime=" 159 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 160 + call.timeout)); 161 } 162 163 /** 164 * Reads the call from the queue, write them on the socket. 165 */ 166 @Override 167 public void run() { 168 synchronized (BlockingRpcConnection.this) { 169 while (!closed) { 170 if (callsToWrite.isEmpty()) { 171 // We should use another monitor object here for better performance since the read 172 // thread also uses ConnectionImpl.this. But this makes the locking schema more 173 // complicated, can do it later as an optimization. 174 try { 175 BlockingRpcConnection.this.wait(); 176 } catch (InterruptedException e) { 177 // Restore interrupt status 178 Thread.currentThread().interrupt(); 179 } 180 // check if we need to quit, so continue the main loop instead of fallback. 181 continue; 182 } 183 Call call = callsToWrite.poll(); 184 if (call.isDone()) { 185 continue; 186 } 187 try (Scope scope = call.span.makeCurrent()) { 188 writeRequest(call); 189 } catch (IOException e) { 190 // exception here means the call has not been added to the pendingCalls yet, so we need 191 // to fail it by our own. 192 LOG.debug("call write error for {}", call.toShortString()); 193 call.setException(e); 194 closeConn(e); 195 } 196 } 197 } 198 } 199 200 /** 201 * Cleans the call not yet sent when we finish. 202 */ 203 public void cleanup(IOException e) { 204 IOException ie = 205 new ConnectionClosingException("Connection to " + remoteId.getAddress() + " is closing."); 206 for (Call call : callsToWrite) { 207 call.setException(ie); 208 } 209 callsToWrite.clear(); 210 } 211 } 212 213 BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { 214 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, 215 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, 216 rpcClient.cellBlockBuilder, rpcClient.metrics, rpcClient.connectionAttributes); 217 this.rpcClient = rpcClient; 218 this.connectionHeaderPreamble = getConnectionHeaderPreamble(); 219 ConnectionHeader header = getConnectionHeader(); 220 ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize()); 221 DataOutputStream dos = new DataOutputStream(baos); 222 dos.writeInt(header.getSerializedSize()); 223 header.writeTo(dos); 224 assert baos.size() == 4 + header.getSerializedSize(); 225 this.connectionHeaderWithLength = baos.getBuffer(); 226 227 UserGroupInformation ticket = remoteId.ticket.getUGI(); 228 this.threadName = "BRPC Connection (" + this.rpcClient.socketFactory.hashCode() + ") to " 229 + remoteId.getAddress().toString() 230 + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName())); 231 232 if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) { 233 callSender = new CallSender(threadName, this.rpcClient.conf); 234 callSender.start(); 235 } else { 236 callSender = null; 237 } 238 } 239 240 // protected for write UT. 241 protected void setupConnection() throws IOException { 242 short ioFailures = 0; 243 short timeoutFailures = 0; 244 while (true) { 245 try { 246 this.socket = this.rpcClient.socketFactory.createSocket(); 247 this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay()); 248 this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive); 249 if (this.rpcClient.localAddr != null) { 250 this.socket.bind(this.rpcClient.localAddr); 251 } 252 InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); 253 NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO); 254 this.socket.setSoTimeout(this.rpcClient.readTO); 255 return; 256 } catch (SocketTimeoutException toe) { 257 /* 258 * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries. 259 */ 260 if (LOG.isDebugEnabled()) { 261 LOG.debug( 262 "Received exception in connection setup.\n" + StringUtils.stringifyException(toe)); 263 } 264 handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe); 265 } catch (IOException ie) { 266 if (LOG.isDebugEnabled()) { 267 LOG.debug( 268 "Received exception in connection setup.\n" + StringUtils.stringifyException(ie)); 269 } 270 handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie); 271 } 272 } 273 } 274 275 /** 276 * Handle connection failures If the current number of retries is equal to the max number of 277 * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting 278 * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence 279 * the sleep is synchronized; the locks will be retained. 280 * @param curRetries current number of retries 281 * @param maxRetries max number of retries allowed 282 * @param ioe failure reason 283 * @throws IOException if max number of retries is reached 284 */ 285 private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) 286 throws IOException { 287 closeSocket(); 288 289 // throw the exception if the maximum number of retries is reached 290 if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) { 291 throw ioe; 292 } 293 294 // otherwise back off and retry 295 try { 296 Thread.sleep(this.rpcClient.failureSleep); 297 } catch (InterruptedException ie) { 298 ExceptionUtil.rethrowIfInterrupt(ie); 299 } 300 301 if (LOG.isInfoEnabled()) { 302 LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping " 303 + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s)."); 304 } 305 } 306 307 /* 308 * wait till someone signals us to start reading RPC response or it is idle too long, it is marked 309 * as to be closed, or the client is marked as not running. 310 * @return true if it is time to read a response; false otherwise. 311 */ 312 private synchronized boolean waitForWork() { 313 // beware of the concurrent access to the calls list: we can add calls, but as well 314 // remove them. 315 long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose; 316 for (;;) { 317 if (thread == null) { 318 return false; 319 } 320 if (!calls.isEmpty()) { 321 return true; 322 } 323 if (EnvironmentEdgeManager.currentTime() >= waitUntil) { 324 closeConn( 325 new IOException("idle connection closed with " + calls.size() + " pending request(s)")); 326 return false; 327 } 328 try { 329 wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000)); 330 } catch (InterruptedException e) { 331 // Restore interrupt status 332 Thread.currentThread().interrupt(); 333 } 334 } 335 } 336 337 @Override 338 public void run() { 339 if (LOG.isTraceEnabled()) { 340 LOG.trace(threadName + ": starting"); 341 } 342 while (waitForWork()) { 343 readResponse(); 344 } 345 if (LOG.isTraceEnabled()) { 346 LOG.trace(threadName + ": stopped"); 347 } 348 } 349 350 private void disposeSasl() { 351 if (saslRpcClient != null) { 352 saslRpcClient.dispose(); 353 saslRpcClient = null; 354 } 355 } 356 357 private boolean setupSaslConnection(final InputStream in2, final OutputStream out2, 358 String serverPrincipal) throws IOException { 359 if (this.metrics != null) { 360 this.metrics.incrNsLookups(); 361 } 362 saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, 363 socket.getInetAddress(), serverPrincipal, this.rpcClient.fallbackAllowed, 364 this.rpcClient.conf.get("hbase.rpc.protection", 365 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), 366 this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT)); 367 return saslRpcClient.saslConnect(in2, out2); 368 } 369 370 /** 371 * If multiple clients with the same principal try to connect to the same server at the same time, 372 * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to 373 * work around this, what is done is that the client backs off randomly and tries to initiate the 374 * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is 375 * attempted. 376 * <p> 377 * The retry logic is governed by the {@link SaslClientAuthenticationProvider#canRetry()} method. 378 * Some providers have the ability to obtain new credentials and then re-attempt to authenticate 379 * with HBase services. Other providers will continue to fail if they failed the first time -- for 380 * those, we want to fail-fast. 381 * </p> 382 */ 383 private void handleSaslConnectionFailure(final int currRetries, final int maxRetries, 384 final Exception ex, final UserGroupInformation user, final String serverPrincipal) 385 throws IOException, InterruptedException { 386 closeSocket(); 387 user.doAs(new PrivilegedExceptionAction<Object>() { 388 @Override 389 public Object run() throws IOException, InterruptedException { 390 // A provider which failed authentication, but doesn't have the ability to relogin with 391 // some external system (e.g. username/password, the password either works or it doesn't) 392 if (!provider.canRetry()) { 393 LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(), 394 ex); 395 if (ex instanceof RemoteException) { 396 throw (RemoteException) ex; 397 } 398 if (ex instanceof SaslException) { 399 String msg = "SASL authentication failed." 400 + " The most likely cause is missing or invalid credentials."; 401 throw new RuntimeException(msg, ex); 402 } 403 throw new IOException(ex); 404 } 405 406 // Other providers, like kerberos, could request a new ticket from a keytab. Let 407 // them try again. 408 if (currRetries < maxRetries) { 409 LOG.debug("Exception encountered while connecting to the server " + remoteId.getAddress(), 410 ex); 411 412 // Invoke the provider to perform the relogin 413 provider.relogin(); 414 415 // Get rid of any old state on the SaslClient 416 disposeSasl(); 417 418 // have granularity of milliseconds 419 // we are sleeping with the Connection lock held but since this 420 // connection instance is being used for connecting to the server 421 // in question, it is okay 422 Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1); 423 return null; 424 } else { 425 String msg = "Failed to initiate connection for " 426 + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; 427 throw new IOException(msg, ex); 428 } 429 } 430 }); 431 } 432 433 private void getConnectionRegistry(InputStream inStream, OutputStream outStream, 434 Call connectionRegistryCall) throws IOException { 435 outStream.write(RpcClient.REGISTRY_PREAMBLE_HEADER); 436 readResponse(new DataInputStream(inStream), calls, connectionRegistryCall, remoteExc -> { 437 synchronized (this) { 438 closeConn(remoteExc); 439 } 440 }); 441 } 442 443 private void createStreams(InputStream inStream, OutputStream outStream) { 444 this.in = new DataInputStream(new BufferedInputStream(inStream)); 445 this.out = new DataOutputStream(new BufferedOutputStream(outStream)); 446 } 447 448 // choose the server principal to use 449 private String chooseServerPrincipal(InputStream inStream, OutputStream outStream) 450 throws IOException { 451 Set<String> serverPrincipals = getServerPrincipals(); 452 if (serverPrincipals.size() == 1) { 453 return serverPrincipals.iterator().next(); 454 } 455 // this means we use kerberos authentication and there are multiple server principal candidates, 456 // in this way we need to send a special preamble header to get server principal from server 457 Call securityPreambleCall = createSecurityPreambleCall(r -> { 458 }); 459 outStream.write(RpcClient.SECURITY_PREAMBLE_HEADER); 460 readResponse(new DataInputStream(inStream), calls, securityPreambleCall, remoteExc -> { 461 synchronized (this) { 462 closeConn(remoteExc); 463 } 464 }); 465 if (securityPreambleCall.error != null) { 466 LOG.debug("Error when trying to do a security preamble call to {}", remoteId.address, 467 securityPreambleCall.error); 468 if (ConnectionUtils.isUnexpectedPreambleHeaderException(securityPreambleCall.error)) { 469 // this means we are connecting to an old server which does not support the security 470 // preamble call, so we should fallback to randomly select a principal to use 471 // TODO: find a way to reconnect without failing all the pending calls, for now, when we 472 // reach here, shutdown should have already been scheduled 473 throw securityPreambleCall.error; 474 } 475 if (IPCUtil.isSecurityNotEnabledException(securityPreambleCall.error)) { 476 // server tells us security is not enabled, then we should check whether fallback to 477 // simple is allowed, if so we just go without security, otherwise we should fail the 478 // negotiation immediately 479 if (rpcClient.fallbackAllowed) { 480 // TODO: just change the preamble and skip the fallback to simple logic, for now, just 481 // select the first principal can finish the connection setup, but waste one client 482 // message 483 return serverPrincipals.iterator().next(); 484 } else { 485 throw new FallbackDisallowedException(); 486 } 487 } 488 return randomSelect(serverPrincipals); 489 } 490 return chooseServerPrincipal(serverPrincipals, securityPreambleCall); 491 } 492 493 private void setupIOstreams(Call connectionRegistryCall) throws IOException { 494 if (socket != null) { 495 // The connection is already available. Perfect. 496 return; 497 } 498 499 if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) { 500 if (LOG.isDebugEnabled()) { 501 LOG.debug("Not trying to connect to " + remoteId.getAddress() 502 + " this server is in the failed servers list"); 503 } 504 throw new FailedServerException( 505 "This server is in the failed servers list: " + remoteId.getAddress()); 506 } 507 508 try { 509 if (LOG.isDebugEnabled()) { 510 LOG.debug("Connecting to " + remoteId.getAddress()); 511 } 512 513 short numRetries = 0; 514 int reloginMaxRetries = this.rpcClient.conf.getInt("hbase.security.relogin.maxretries", 5); 515 while (true) { 516 setupConnection(); 517 InputStream inStream = NetUtils.getInputStream(socket); 518 // This creates a socket with a write timeout. This timeout cannot be changed. 519 OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); 520 if (connectionRegistryCall != null) { 521 getConnectionRegistry(inStream, outStream, connectionRegistryCall); 522 closeSocket(); 523 return; 524 } 525 526 if (useSasl) { 527 UserGroupInformation ticket = provider.getRealUser(remoteId.ticket); 528 boolean continueSasl; 529 if (ticket == null) { 530 throw new FatalConnectionException("ticket/user is null"); 531 } 532 String serverPrincipal = chooseServerPrincipal(inStream, outStream); 533 // Write out the preamble -- MAGIC, version, and auth to use. 534 writeConnectionHeaderPreamble(outStream); 535 try { 536 final InputStream in2 = inStream; 537 final OutputStream out2 = outStream; 538 continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() { 539 @Override 540 public Boolean run() throws IOException { 541 return setupSaslConnection(in2, out2, serverPrincipal); 542 } 543 }); 544 } catch (Exception ex) { 545 ExceptionUtil.rethrowIfInterrupt(ex); 546 saslNegotiationDone(serverPrincipal, false); 547 handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, ticket, 548 serverPrincipal); 549 continue; 550 } 551 saslNegotiationDone(serverPrincipal, true); 552 if (continueSasl) { 553 // Sasl connect is successful. Let's set up Sasl i/o streams. 554 inStream = saslRpcClient.getInputStream(); 555 outStream = saslRpcClient.getOutputStream(); 556 } else { 557 // fall back to simple auth because server told us so. 558 // do not change authMethod and useSasl here, we should start from secure when 559 // reconnecting because regionserver may change its sasl config after restart. 560 saslRpcClient = null; 561 } 562 } else { 563 // Write out the preamble -- MAGIC, version, and auth to use. 564 writeConnectionHeaderPreamble(outStream); 565 } 566 createStreams(inStream, outStream); 567 // Now write out the connection header 568 writeConnectionHeader(); 569 // process the response from server for connection header if necessary 570 processResponseForConnectionHeader(); 571 break; 572 } 573 } catch (Throwable t) { 574 closeSocket(); 575 IOException e = ExceptionUtil.asInterrupt(t); 576 if (e == null) { 577 this.rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), t); 578 if (t instanceof LinkageError) { 579 // probably the hbase hadoop version does not match the running hadoop version 580 e = new DoNotRetryIOException(t); 581 } else if (t instanceof IOException) { 582 e = (IOException) t; 583 } else { 584 e = new IOException("Could not set up IO Streams to " + remoteId.getAddress(), t); 585 } 586 } 587 throw e; 588 } 589 590 // start the receiver thread after the socket connection has been set up 591 thread = new Thread(this, threadName); 592 thread.setDaemon(true); 593 thread.start(); 594 } 595 596 /** 597 * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>} 598 */ 599 private void writeConnectionHeaderPreamble(OutputStream out) throws IOException { 600 out.write(connectionHeaderPreamble); 601 out.flush(); 602 } 603 604 /** 605 * Write the connection header. 606 */ 607 private void writeConnectionHeader() throws IOException { 608 boolean isCryptoAesEnable = false; 609 // check if Crypto AES is enabled 610 if (saslRpcClient != null) { 611 boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop() 612 .equalsIgnoreCase(saslRpcClient.getSaslQOP()); 613 isCryptoAesEnable = saslEncryptionEnabled 614 && conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT); 615 } 616 617 // if Crypto AES is enabled, set transformation and negotiate with server 618 if (isCryptoAesEnable) { 619 waitingConnectionHeaderResponse = true; 620 } 621 this.out.write(connectionHeaderWithLength); 622 this.out.flush(); 623 } 624 625 private void processResponseForConnectionHeader() throws IOException { 626 // if no response excepted, return 627 if (!waitingConnectionHeaderResponse) return; 628 try { 629 // read the ConnectionHeaderResponse from server 630 int len = this.in.readInt(); 631 byte[] buff = new byte[len]; 632 int readSize = this.in.read(buff); 633 if (LOG.isDebugEnabled()) { 634 LOG.debug("Length of response for connection header:" + readSize); 635 } 636 637 RPCProtos.ConnectionHeaderResponse connectionHeaderResponse = 638 RPCProtos.ConnectionHeaderResponse.parseFrom(buff); 639 640 // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher 641 if (connectionHeaderResponse.hasCryptoCipherMeta()) { 642 negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta()); 643 } 644 waitingConnectionHeaderResponse = false; 645 } catch (SocketTimeoutException ste) { 646 LOG.error(HBaseMarkers.FATAL, 647 "Can't get the connection header response for rpc timeout, " 648 + "please check if server has the correct configuration to support the additional " 649 + "function.", 650 ste); 651 // timeout when waiting the connection header response, ignore the additional function 652 throw new IOException("Timeout while waiting connection header response", ste); 653 } 654 } 655 656 private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta) throws IOException { 657 // initialize the Crypto AES with CryptoCipherMeta 658 saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf); 659 // reset the inputStream/outputStream for Crypto AES encryption 660 this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream())); 661 this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream())); 662 } 663 664 /** 665 * Initiates a call by sending the parameter to the remote server. Note: this is not called from 666 * the Connection thread, but by other threads. 667 * @see #readResponse() 668 */ 669 private void writeRequest(Call call) throws IOException { 670 ByteBuf cellBlock = null; 671 try { 672 cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, 673 call.cells, PooledByteBufAllocator.DEFAULT); 674 CellBlockMeta cellBlockMeta; 675 if (cellBlock != null) { 676 cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build(); 677 } else { 678 cellBlockMeta = null; 679 } 680 RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); 681 if (call.isConnectionRegistryCall()) { 682 setupIOstreams(call); 683 return; 684 } 685 setupIOstreams(null); 686 687 // Now we're going to write the call. We take the lock, then check that the connection 688 // is still valid, and, if so we do the write to the socket. If the write fails, we don't 689 // know where we stand, we have to close the connection. 690 if (Thread.interrupted()) { 691 throw new InterruptedIOException(); 692 } 693 694 calls.put(call.id, call); // We put first as we don't want the connection to become idle. 695 // from here, we do not throw any exception to upper layer as the call has been tracked in 696 // the pending calls map. 697 try { 698 call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); 699 } catch (Throwable t) { 700 if (LOG.isTraceEnabled()) { 701 LOG.trace("Error while writing {}", call.toShortString()); 702 } 703 IOException e = IPCUtil.toIOE(t); 704 closeConn(e); 705 return; 706 } 707 } finally { 708 if (cellBlock != null) { 709 cellBlock.release(); 710 } 711 } 712 notifyAll(); 713 } 714 715 /* 716 * Receive a response. Because only one receiver, so no synchronization on in. 717 */ 718 private void readResponse() { 719 try { 720 readResponse(in, calls, null, remoteExc -> { 721 synchronized (this) { 722 closeConn(remoteExc); 723 } 724 }); 725 } catch (IOException e) { 726 if (e instanceof SocketTimeoutException) { 727 // Clean up open calls but don't treat this as a fatal condition, 728 // since we expect certain responses to not make it by the specified 729 // {@link ConnectionId#rpcTimeout}. 730 if (LOG.isTraceEnabled()) { 731 LOG.trace("ignored", e); 732 } 733 } else { 734 synchronized (this) { 735 closeConn(e); 736 } 737 } 738 } 739 } 740 741 @Override 742 protected synchronized void callTimeout(Call call) { 743 // call sender 744 calls.remove(call.id); 745 } 746 747 // just close socket input and output. 748 private void closeSocket() { 749 IOUtils.closeStream(out); 750 IOUtils.closeStream(in); 751 IOUtils.closeSocket(socket); 752 out = null; 753 in = null; 754 socket = null; 755 } 756 757 // close socket, reader, and clean up all pending calls. 758 private void closeConn(IOException e) { 759 if (thread == null) { 760 return; 761 } 762 thread.interrupt(); 763 thread = null; 764 closeSocket(); 765 if (callSender != null) { 766 callSender.cleanup(e); 767 } 768 for (Call call : calls.values()) { 769 call.setException(e); 770 } 771 calls.clear(); 772 } 773 774 // release all resources, the connection will not be used any more. 775 @Override 776 public synchronized void shutdown() { 777 closed = true; 778 if (callSender != null) { 779 callSender.interrupt(); 780 } 781 closeConn(new IOException("connection to " + remoteId.getAddress() + " closed")); 782 } 783 784 @Override 785 public void cleanupConnection() { 786 // do nothing 787 } 788 789 @Override 790 public synchronized void sendRequest(final Call call, HBaseRpcController pcrc) 791 throws IOException { 792 pcrc.notifyOnCancel(new RpcCallback<Object>() { 793 794 @Override 795 public void run(Object parameter) { 796 setCancelled(call); 797 synchronized (BlockingRpcConnection.this) { 798 if (callSender != null) { 799 callSender.remove(call); 800 } else { 801 calls.remove(call.id); 802 } 803 } 804 } 805 }, new CancellationCallback() { 806 807 @Override 808 public void run(boolean cancelled) throws IOException { 809 if (cancelled) { 810 setCancelled(call); 811 return; 812 } 813 scheduleTimeoutTask(call); 814 if (callSender != null) { 815 callSender.sendCall(call); 816 } else { 817 // this is in the same thread with the caller so do not need to attach the trace context 818 // again. 819 writeRequest(call); 820 } 821 } 822 }); 823 } 824 825 @Override 826 public synchronized boolean isActive() { 827 return thread != null; 828 } 829}