001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader; 021import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited; 023import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException; 024import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; 025import static org.apache.hadoop.hbase.ipc.IPCUtil.write; 026 027import io.opentelemetry.context.Scope; 028import java.io.BufferedInputStream; 029import java.io.BufferedOutputStream; 030import java.io.DataInputStream; 031import java.io.DataOutputStream; 032import java.io.IOException; 033import java.io.InputStream; 034import java.io.InterruptedIOException; 035import java.io.OutputStream; 036import java.net.InetSocketAddress; 037import java.net.Socket; 038import java.net.SocketTimeoutException; 039import java.security.PrivilegedExceptionAction; 040import java.util.ArrayDeque; 041import java.util.Locale; 042import java.util.Queue; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.ConcurrentMap; 045import java.util.concurrent.ThreadLocalRandom; 046import javax.security.sasl.SaslException; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.hbase.CellScanner; 049import org.apache.hadoop.hbase.DoNotRetryIOException; 050import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 051import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 052import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; 053import org.apache.hadoop.hbase.log.HBaseMarkers; 054import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; 055import org.apache.hadoop.hbase.security.SaslUtil; 056import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 057import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.hbase.util.ExceptionUtil; 060import org.apache.hadoop.io.IOUtils; 061import org.apache.hadoop.ipc.RemoteException; 062import org.apache.hadoop.net.NetUtils; 063import org.apache.hadoop.security.UserGroupInformation; 064import org.apache.hadoop.util.StringUtils; 065import org.apache.yetus.audience.InterfaceAudience; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.protobuf.Message; 070import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 071import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 072import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 073 074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 081 082/** 083 * Thread that reads responses and notifies callers. Each connection owns a socket connected to a 084 * remote address. Calls are multiplexed through this socket: responses may be delivered out of 085 * order. 086 */ 087@InterfaceAudience.Private 088class BlockingRpcConnection extends RpcConnection implements Runnable { 089 090 private static final Logger LOG = LoggerFactory.getLogger(BlockingRpcConnection.class); 091 092 private final BlockingRpcClient rpcClient; 093 094 private final String threadName; 095 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 096 justification = "We are always under lock actually") 097 private Thread thread; 098 099 // connected socket. protected for writing UT. 100 protected Socket socket = null; 101 private DataInputStream in; 102 private DataOutputStream out; 103 104 private HBaseSaslRpcClient saslRpcClient; 105 106 // currently active calls 107 private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>(); 108 109 private final CallSender callSender; 110 111 private boolean closed = false; 112 113 private byte[] connectionHeaderPreamble; 114 115 private byte[] connectionHeaderWithLength; 116 117 private boolean waitingConnectionHeaderResponse = false; 118 119 /** 120 * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a 121 * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to 122 * use a different thread for writing. This way, on interruptions, we either cancel the writes or 123 * ignore the answer if the write is already done, but we don't stop the write in the middle. This 124 * adds a thread per region server in the client, so it's kept as an option. 125 * <p> 126 * The implementation is simple: the client threads adds their call to the queue, and then wait 127 * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On 128 * interruption, the client cancels its call. The CallSender checks that the call has not been 129 * canceled before writing it. 130 * </p> 131 * When the connection closes, all the calls not yet sent are dismissed. The client thread is 132 * notified with an appropriate exception, as if the call was already sent but the answer not yet 133 * received. 134 * </p> 135 */ 136 private class CallSender extends Thread { 137 138 private final Queue<Call> callsToWrite; 139 140 private final int maxQueueSize; 141 142 public CallSender(String name, Configuration conf) { 143 int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000); 144 callsToWrite = new ArrayDeque<>(queueSize); 145 this.maxQueueSize = queueSize; 146 setDaemon(true); 147 setName(name + " - writer"); 148 } 149 150 public void sendCall(final Call call) throws IOException { 151 if (callsToWrite.size() >= maxQueueSize) { 152 throw new IOException("Can't add " + call.toShortString() 153 + " to the write queue. callsToWrite.size()=" + callsToWrite.size()); 154 } 155 callsToWrite.offer(call); 156 BlockingRpcConnection.this.notifyAll(); 157 } 158 159 public void remove(Call call) { 160 callsToWrite.remove(call); 161 // By removing the call from the expected call list, we make the list smaller, but 162 // it means as well that we don't know how many calls we cancelled. 163 calls.remove(call.id); 164 call.setException(new CallCancelledException(call.toShortString() + ", waitTime=" 165 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 166 + call.timeout)); 167 } 168 169 /** 170 * Reads the call from the queue, write them on the socket. 171 */ 172 @Override 173 public void run() { 174 synchronized (BlockingRpcConnection.this) { 175 while (!closed) { 176 if (callsToWrite.isEmpty()) { 177 // We should use another monitor object here for better performance since the read 178 // thread also uses ConnectionImpl.this. But this makes the locking schema more 179 // complicated, can do it later as an optimization. 180 try { 181 BlockingRpcConnection.this.wait(); 182 } catch (InterruptedException e) { 183 // Restore interrupt status 184 Thread.currentThread().interrupt(); 185 } 186 // check if we need to quit, so continue the main loop instead of fallback. 187 continue; 188 } 189 Call call = callsToWrite.poll(); 190 if (call.isDone()) { 191 continue; 192 } 193 try (Scope scope = call.span.makeCurrent()) { 194 writeRequest(call); 195 } catch (IOException e) { 196 // exception here means the call has not been added to the pendingCalls yet, so we need 197 // to fail it by our own. 198 LOG.debug("call write error for {}", call.toShortString()); 199 call.setException(e); 200 closeConn(e); 201 } 202 } 203 } 204 } 205 206 /** 207 * Cleans the call not yet sent when we finish. 208 */ 209 public void cleanup(IOException e) { 210 IOException ie = 211 new ConnectionClosingException("Connection to " + remoteId.getAddress() + " is closing."); 212 for (Call call : callsToWrite) { 213 call.setException(ie); 214 } 215 callsToWrite.clear(); 216 } 217 } 218 219 BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { 220 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, 221 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, 222 rpcClient.metrics, rpcClient.connectionAttributes); 223 this.rpcClient = rpcClient; 224 this.connectionHeaderPreamble = getConnectionHeaderPreamble(); 225 ConnectionHeader header = getConnectionHeader(); 226 ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize()); 227 DataOutputStream dos = new DataOutputStream(baos); 228 dos.writeInt(header.getSerializedSize()); 229 header.writeTo(dos); 230 assert baos.size() == 4 + header.getSerializedSize(); 231 this.connectionHeaderWithLength = baos.getBuffer(); 232 233 UserGroupInformation ticket = remoteId.ticket.getUGI(); 234 this.threadName = "BRPC Connection (" + this.rpcClient.socketFactory.hashCode() + ") to " 235 + remoteId.getAddress().toString() 236 + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName())); 237 238 if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) { 239 callSender = new CallSender(threadName, this.rpcClient.conf); 240 callSender.start(); 241 } else { 242 callSender = null; 243 } 244 } 245 246 // protected for write UT. 247 protected void setupConnection() throws IOException { 248 short ioFailures = 0; 249 short timeoutFailures = 0; 250 while (true) { 251 try { 252 this.socket = this.rpcClient.socketFactory.createSocket(); 253 this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay()); 254 this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive); 255 if (this.rpcClient.localAddr != null) { 256 this.socket.bind(this.rpcClient.localAddr); 257 } 258 InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); 259 NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO); 260 this.socket.setSoTimeout(this.rpcClient.readTO); 261 return; 262 } catch (SocketTimeoutException toe) { 263 /* 264 * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries. 265 */ 266 if (LOG.isDebugEnabled()) { 267 LOG.debug( 268 "Received exception in connection setup.\n" + StringUtils.stringifyException(toe)); 269 } 270 handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe); 271 } catch (IOException ie) { 272 if (LOG.isDebugEnabled()) { 273 LOG.debug( 274 "Received exception in connection setup.\n" + StringUtils.stringifyException(ie)); 275 } 276 handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie); 277 } 278 } 279 } 280 281 /** 282 * Handle connection failures If the current number of retries is equal to the max number of 283 * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting 284 * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence 285 * the sleep is synchronized; the locks will be retained. 286 * @param curRetries current number of retries 287 * @param maxRetries max number of retries allowed 288 * @param ioe failure reason 289 * @throws IOException if max number of retries is reached 290 */ 291 private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) 292 throws IOException { 293 closeSocket(); 294 295 // throw the exception if the maximum number of retries is reached 296 if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) { 297 throw ioe; 298 } 299 300 // otherwise back off and retry 301 try { 302 Thread.sleep(this.rpcClient.failureSleep); 303 } catch (InterruptedException ie) { 304 ExceptionUtil.rethrowIfInterrupt(ie); 305 } 306 307 if (LOG.isInfoEnabled()) { 308 LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping " 309 + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s)."); 310 } 311 } 312 313 /* 314 * wait till someone signals us to start reading RPC response or it is idle too long, it is marked 315 * as to be closed, or the client is marked as not running. 316 * @return true if it is time to read a response; false otherwise. 317 */ 318 private synchronized boolean waitForWork() { 319 // beware of the concurrent access to the calls list: we can add calls, but as well 320 // remove them. 321 long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose; 322 for (;;) { 323 if (thread == null) { 324 return false; 325 } 326 if (!calls.isEmpty()) { 327 return true; 328 } 329 if (EnvironmentEdgeManager.currentTime() >= waitUntil) { 330 closeConn( 331 new IOException("idle connection closed with " + calls.size() + " pending request(s)")); 332 return false; 333 } 334 try { 335 wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000)); 336 } catch (InterruptedException e) { 337 // Restore interrupt status 338 Thread.currentThread().interrupt(); 339 } 340 } 341 } 342 343 @Override 344 public void run() { 345 if (LOG.isTraceEnabled()) { 346 LOG.trace(threadName + ": starting"); 347 } 348 while (waitForWork()) { 349 readResponse(); 350 } 351 if (LOG.isTraceEnabled()) { 352 LOG.trace(threadName + ": stopped"); 353 } 354 } 355 356 private void disposeSasl() { 357 if (saslRpcClient != null) { 358 saslRpcClient.dispose(); 359 saslRpcClient = null; 360 } 361 } 362 363 private boolean setupSaslConnection(final InputStream in2, final OutputStream out2) 364 throws IOException { 365 if (this.metrics != null) { 366 this.metrics.incrNsLookups(); 367 } 368 saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, 369 socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed, 370 this.rpcClient.conf.get("hbase.rpc.protection", 371 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), 372 this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT)); 373 return saslRpcClient.saslConnect(in2, out2); 374 } 375 376 /** 377 * If multiple clients with the same principal try to connect to the same server at the same time, 378 * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to 379 * work around this, what is done is that the client backs off randomly and tries to initiate the 380 * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is 381 * attempted. 382 * <p> 383 * The retry logic is governed by the {@link SaslClientAuthenticationProvider#canRetry()} method. 384 * Some providers have the ability to obtain new credentials and then re-attempt to authenticate 385 * with HBase services. Other providers will continue to fail if they failed the first time -- for 386 * those, we want to fail-fast. 387 * </p> 388 */ 389 private void handleSaslConnectionFailure(final int currRetries, final int maxRetries, 390 final Exception ex, final UserGroupInformation user) throws IOException, InterruptedException { 391 closeSocket(); 392 user.doAs(new PrivilegedExceptionAction<Object>() { 393 @Override 394 public Object run() throws IOException, InterruptedException { 395 // A provider which failed authentication, but doesn't have the ability to relogin with 396 // some external system (e.g. username/password, the password either works or it doesn't) 397 if (!provider.canRetry()) { 398 LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(), 399 ex); 400 if (ex instanceof RemoteException) { 401 throw (RemoteException) ex; 402 } 403 if (ex instanceof SaslException) { 404 String msg = "SASL authentication failed." 405 + " The most likely cause is missing or invalid credentials."; 406 throw new RuntimeException(msg, ex); 407 } 408 throw new IOException(ex); 409 } 410 411 // Other providers, like kerberos, could request a new ticket from a keytab. Let 412 // them try again. 413 if (currRetries < maxRetries) { 414 LOG.debug("Exception encountered while connecting to the server " + remoteId.getAddress(), 415 ex); 416 417 // Invoke the provider to perform the relogin 418 provider.relogin(); 419 420 // Get rid of any old state on the SaslClient 421 disposeSasl(); 422 423 // have granularity of milliseconds 424 // we are sleeping with the Connection lock held but since this 425 // connection instance is being used for connecting to the server 426 // in question, it is okay 427 Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1); 428 return null; 429 } else { 430 String msg = 431 "Failed to initiate connection for " + UserGroupInformation.getLoginUser().getUserName() 432 + " to " + securityInfo.getServerPrincipal(); 433 throw new IOException(msg, ex); 434 } 435 } 436 }); 437 } 438 439 private void setupIOstreams() throws IOException { 440 if (socket != null) { 441 // The connection is already available. Perfect. 442 return; 443 } 444 445 if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) { 446 if (LOG.isDebugEnabled()) { 447 LOG.debug("Not trying to connect to " + remoteId.getAddress() 448 + " this server is in the failed servers list"); 449 } 450 throw new FailedServerException( 451 "This server is in the failed servers list: " + remoteId.getAddress()); 452 } 453 454 try { 455 if (LOG.isDebugEnabled()) { 456 LOG.debug("Connecting to " + remoteId.getAddress()); 457 } 458 459 short numRetries = 0; 460 int reloginMaxRetries = this.rpcClient.conf.getInt("hbase.security.relogin.maxretries", 5); 461 while (true) { 462 setupConnection(); 463 InputStream inStream = NetUtils.getInputStream(socket); 464 // This creates a socket with a write timeout. This timeout cannot be changed. 465 OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); 466 // Write out the preamble -- MAGIC, version, and auth to use. 467 writeConnectionHeaderPreamble(outStream); 468 if (useSasl) { 469 final InputStream in2 = inStream; 470 final OutputStream out2 = outStream; 471 UserGroupInformation ticket = provider.getRealUser(remoteId.ticket); 472 boolean continueSasl; 473 if (ticket == null) { 474 throw new FatalConnectionException("ticket/user is null"); 475 } 476 try { 477 continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() { 478 @Override 479 public Boolean run() throws IOException { 480 return setupSaslConnection(in2, out2); 481 } 482 }); 483 } catch (Exception ex) { 484 ExceptionUtil.rethrowIfInterrupt(ex); 485 handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, ticket); 486 continue; 487 } 488 if (continueSasl) { 489 // Sasl connect is successful. Let's set up Sasl i/o streams. 490 inStream = saslRpcClient.getInputStream(); 491 outStream = saslRpcClient.getOutputStream(); 492 } else { 493 // fall back to simple auth because server told us so. 494 // do not change authMethod and useSasl here, we should start from secure when 495 // reconnecting because regionserver may change its sasl config after restart. 496 } 497 } 498 this.in = new DataInputStream(new BufferedInputStream(inStream)); 499 this.out = new DataOutputStream(new BufferedOutputStream(outStream)); 500 // Now write out the connection header 501 writeConnectionHeader(); 502 // process the response from server for connection header if necessary 503 processResponseForConnectionHeader(); 504 505 break; 506 } 507 } catch (Throwable t) { 508 closeSocket(); 509 IOException e = ExceptionUtil.asInterrupt(t); 510 if (e == null) { 511 this.rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), t); 512 if (t instanceof LinkageError) { 513 // probably the hbase hadoop version does not match the running hadoop version 514 e = new DoNotRetryIOException(t); 515 } else if (t instanceof IOException) { 516 e = (IOException) t; 517 } else { 518 e = new IOException("Could not set up IO Streams to " + remoteId.getAddress(), t); 519 } 520 } 521 throw e; 522 } 523 524 // start the receiver thread after the socket connection has been set up 525 thread = new Thread(this, threadName); 526 thread.setDaemon(true); 527 thread.start(); 528 } 529 530 /** 531 * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>} 532 */ 533 private void writeConnectionHeaderPreamble(OutputStream out) throws IOException { 534 out.write(connectionHeaderPreamble); 535 out.flush(); 536 } 537 538 /** 539 * Write the connection header. 540 */ 541 private void writeConnectionHeader() throws IOException { 542 boolean isCryptoAesEnable = false; 543 // check if Crypto AES is enabled 544 if (saslRpcClient != null) { 545 boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop() 546 .equalsIgnoreCase(saslRpcClient.getSaslQOP()); 547 isCryptoAesEnable = saslEncryptionEnabled 548 && conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT); 549 } 550 551 // if Crypto AES is enabled, set transformation and negotiate with server 552 if (isCryptoAesEnable) { 553 waitingConnectionHeaderResponse = true; 554 } 555 this.out.write(connectionHeaderWithLength); 556 this.out.flush(); 557 } 558 559 private void processResponseForConnectionHeader() throws IOException { 560 // if no response excepted, return 561 if (!waitingConnectionHeaderResponse) return; 562 try { 563 // read the ConnectionHeaderResponse from server 564 int len = this.in.readInt(); 565 byte[] buff = new byte[len]; 566 int readSize = this.in.read(buff); 567 if (LOG.isDebugEnabled()) { 568 LOG.debug("Length of response for connection header:" + readSize); 569 } 570 571 RPCProtos.ConnectionHeaderResponse connectionHeaderResponse = 572 RPCProtos.ConnectionHeaderResponse.parseFrom(buff); 573 574 // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher 575 if (connectionHeaderResponse.hasCryptoCipherMeta()) { 576 negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta()); 577 } 578 waitingConnectionHeaderResponse = false; 579 } catch (SocketTimeoutException ste) { 580 LOG.error(HBaseMarkers.FATAL, 581 "Can't get the connection header response for rpc timeout, " 582 + "please check if server has the correct configuration to support the additional " 583 + "function.", 584 ste); 585 // timeout when waiting the connection header response, ignore the additional function 586 throw new IOException("Timeout while waiting connection header response", ste); 587 } 588 } 589 590 private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta) throws IOException { 591 // initialize the Crypto AES with CryptoCipherMeta 592 saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf); 593 // reset the inputStream/outputStream for Crypto AES encryption 594 this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream())); 595 this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream())); 596 } 597 598 /** 599 * Initiates a call by sending the parameter to the remote server. Note: this is not called from 600 * the Connection thread, but by other threads. 601 * @see #readResponse() 602 */ 603 private void writeRequest(Call call) throws IOException { 604 ByteBuf cellBlock = null; 605 try { 606 cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, 607 call.cells, PooledByteBufAllocator.DEFAULT); 608 CellBlockMeta cellBlockMeta; 609 if (cellBlock != null) { 610 cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build(); 611 } else { 612 cellBlockMeta = null; 613 } 614 RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); 615 616 setupIOstreams(); 617 618 // Now we're going to write the call. We take the lock, then check that the connection 619 // is still valid, and, if so we do the write to the socket. If the write fails, we don't 620 // know where we stand, we have to close the connection. 621 if (Thread.interrupted()) { 622 throw new InterruptedIOException(); 623 } 624 625 calls.put(call.id, call); // We put first as we don't want the connection to become idle. 626 // from here, we do not throw any exception to upper layer as the call has been tracked in 627 // the pending calls map. 628 try { 629 call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); 630 } catch (Throwable t) { 631 if (LOG.isTraceEnabled()) { 632 LOG.trace("Error while writing {}", call.toShortString()); 633 } 634 IOException e = IPCUtil.toIOE(t); 635 closeConn(e); 636 return; 637 } 638 } finally { 639 if (cellBlock != null) { 640 cellBlock.release(); 641 } 642 } 643 notifyAll(); 644 } 645 646 /* 647 * Receive a response. Because only one receiver, so no synchronization on in. 648 */ 649 private void readResponse() { 650 Call call = null; 651 boolean expectedCall = false; 652 try { 653 // See HBaseServer.Call.setResponse for where we write out the response. 654 // Total size of the response. Unused. But have to read it in anyways. 655 int totalSize = in.readInt(); 656 657 // Read the header 658 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); 659 int id = responseHeader.getCallId(); 660 call = calls.remove(id); // call.done have to be set before leaving this method 661 expectedCall = (call != null && !call.isDone()); 662 if (!expectedCall) { 663 // So we got a response for which we have no corresponding 'call' here on the client-side. 664 // We probably timed out waiting, cleaned up all references, and now the server decides 665 // to return a response. There is nothing we can do w/ the response at this stage. Clean 666 // out the wire of the response so its out of the way and we can get other responses on 667 // this connection. 668 int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader); 669 int whatIsLeftToRead = totalSize - readSoFar; 670 IOUtils.skipFully(in, whatIsLeftToRead); 671 if (call != null) { 672 call.callStats.setResponseSizeBytes(totalSize); 673 call.callStats 674 .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); 675 } 676 return; 677 } 678 if (responseHeader.hasException()) { 679 ExceptionResponse exceptionResponse = responseHeader.getException(); 680 RemoteException re = createRemoteException(exceptionResponse); 681 call.setException(re); 682 call.callStats.setResponseSizeBytes(totalSize); 683 call.callStats 684 .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); 685 if (isFatalConnectionException(exceptionResponse)) { 686 synchronized (this) { 687 closeConn(re); 688 } 689 } 690 } else { 691 Message value = null; 692 if (call.responseDefaultType != null) { 693 Message.Builder builder = call.responseDefaultType.newBuilderForType(); 694 ProtobufUtil.mergeDelimitedFrom(builder, in); 695 value = builder.build(); 696 } 697 CellScanner cellBlockScanner = null; 698 if (responseHeader.hasCellBlockMeta()) { 699 int size = responseHeader.getCellBlockMeta().getLength(); 700 byte[] cellBlock = new byte[size]; 701 IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); 702 cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec, 703 this.compressor, cellBlock); 704 } 705 call.setResponse(value, cellBlockScanner); 706 call.callStats.setResponseSizeBytes(totalSize); 707 call.callStats 708 .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); 709 } 710 } catch (IOException e) { 711 if (expectedCall) { 712 call.setException(e); 713 } 714 if (e instanceof SocketTimeoutException) { 715 // Clean up open calls but don't treat this as a fatal condition, 716 // since we expect certain responses to not make it by the specified 717 // {@link ConnectionId#rpcTimeout}. 718 if (LOG.isTraceEnabled()) { 719 LOG.trace("ignored", e); 720 } 721 } else { 722 synchronized (this) { 723 closeConn(e); 724 } 725 } 726 } 727 } 728 729 @Override 730 protected synchronized void callTimeout(Call call) { 731 // call sender 732 calls.remove(call.id); 733 } 734 735 // just close socket input and output. 736 private void closeSocket() { 737 IOUtils.closeStream(out); 738 IOUtils.closeStream(in); 739 IOUtils.closeSocket(socket); 740 out = null; 741 in = null; 742 socket = null; 743 } 744 745 // close socket, reader, and clean up all pending calls. 746 private void closeConn(IOException e) { 747 if (thread == null) { 748 return; 749 } 750 thread.interrupt(); 751 thread = null; 752 closeSocket(); 753 if (callSender != null) { 754 callSender.cleanup(e); 755 } 756 for (Call call : calls.values()) { 757 call.setException(e); 758 } 759 calls.clear(); 760 } 761 762 // release all resources, the connection will not be used any more. 763 @Override 764 public synchronized void shutdown() { 765 closed = true; 766 if (callSender != null) { 767 callSender.interrupt(); 768 } 769 closeConn(new IOException("connection to " + remoteId.getAddress() + " closed")); 770 } 771 772 @Override 773 public void cleanupConnection() { 774 // do nothing 775 } 776 777 @Override 778 public synchronized void sendRequest(final Call call, HBaseRpcController pcrc) 779 throws IOException { 780 pcrc.notifyOnCancel(new RpcCallback<Object>() { 781 782 @Override 783 public void run(Object parameter) { 784 setCancelled(call); 785 synchronized (BlockingRpcConnection.this) { 786 if (callSender != null) { 787 callSender.remove(call); 788 } else { 789 calls.remove(call.id); 790 } 791 } 792 } 793 }, new CancellationCallback() { 794 795 @Override 796 public void run(boolean cancelled) throws IOException { 797 if (cancelled) { 798 setCancelled(call); 799 return; 800 } 801 scheduleTimeoutTask(call); 802 if (callSender != null) { 803 callSender.sendCall(call); 804 } else { 805 // this is in the same thread with the caller so do not need to attach the trace context 806 // again. 807 writeRequest(call); 808 } 809 } 810 }); 811 } 812 813 @Override 814 public synchronized boolean isActive() { 815 return thread != null; 816 } 817}