001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader; 021import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited; 023import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException; 024import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; 025import static org.apache.hadoop.hbase.ipc.IPCUtil.write; 026 027import java.io.BufferedInputStream; 028import java.io.BufferedOutputStream; 029import java.io.DataInputStream; 030import java.io.DataOutputStream; 031import java.io.IOException; 032import java.io.InputStream; 033import java.io.InterruptedIOException; 034import java.io.OutputStream; 035import java.net.Socket; 036import java.net.SocketTimeoutException; 037import java.net.UnknownHostException; 038import java.security.PrivilegedExceptionAction; 039import java.util.ArrayDeque; 040import java.util.Locale; 041import java.util.Queue; 042import java.util.concurrent.ConcurrentHashMap; 043import java.util.concurrent.ConcurrentMap; 044import java.util.concurrent.ThreadLocalRandom; 045import javax.security.sasl.SaslException; 046 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.hbase.CellScanner; 049import org.apache.hadoop.hbase.DoNotRetryIOException; 050import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 051import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 052import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; 053import org.apache.hadoop.hbase.log.HBaseMarkers; 054import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; 055import org.apache.hadoop.hbase.security.SaslUtil; 056import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 057import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; 058import org.apache.hadoop.hbase.trace.TraceUtil; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.ExceptionUtil; 061import org.apache.hadoop.io.IOUtils; 062import org.apache.hadoop.ipc.RemoteException; 063import org.apache.hadoop.net.NetUtils; 064import org.apache.hadoop.security.UserGroupInformation; 065import org.apache.hadoop.util.StringUtils; 066import org.apache.htrace.core.TraceScope; 067import org.apache.yetus.audience.InterfaceAudience; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070import org.apache.hbase.thirdparty.com.google.protobuf.Message; 071import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder; 072import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 073import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 074import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 075import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 082 083/** 084 * Thread that reads responses and notifies callers. Each connection owns a socket connected to a 085 * remote address. Calls are multiplexed through this socket: responses may be delivered out of 086 * order. 087 */ 088@InterfaceAudience.Private 089class BlockingRpcConnection extends RpcConnection implements Runnable { 090 091 private static final Logger LOG = LoggerFactory.getLogger(BlockingRpcConnection.class); 092 093 private final BlockingRpcClient rpcClient; 094 095 private final String threadName; 096 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 097 justification = "We are always under lock actually") 098 private Thread thread; 099 100 // connected socket. protected for writing UT. 101 protected Socket socket = null; 102 private DataInputStream in; 103 private DataOutputStream out; 104 105 private HBaseSaslRpcClient saslRpcClient; 106 107 // currently active calls 108 private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>(); 109 110 private final CallSender callSender; 111 112 private boolean closed = false; 113 114 private byte[] connectionHeaderPreamble; 115 116 private byte[] connectionHeaderWithLength; 117 118 private boolean waitingConnectionHeaderResponse = false; 119 120 /** 121 * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a 122 * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to 123 * use a different thread for writing. This way, on interruptions, we either cancel the writes or 124 * ignore the answer if the write is already done, but we don't stop the write in the middle. This 125 * adds a thread per region server in the client, so it's kept as an option. 126 * <p> 127 * The implementation is simple: the client threads adds their call to the queue, and then wait 128 * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On 129 * interruption, the client cancels its call. The CallSender checks that the call has not been 130 * canceled before writing it. 131 * </p> 132 * When the connection closes, all the calls not yet sent are dismissed. The client thread is 133 * notified with an appropriate exception, as if the call was already sent but the answer not yet 134 * received. 135 * </p> 136 */ 137 private class CallSender extends Thread { 138 139 private final Queue<Call> callsToWrite; 140 141 private final int maxQueueSize; 142 143 public CallSender(String name, Configuration conf) { 144 int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000); 145 callsToWrite = new ArrayDeque<>(queueSize); 146 this.maxQueueSize = queueSize; 147 setDaemon(true); 148 setName(name + " - writer"); 149 } 150 151 public void sendCall(final Call call) throws IOException { 152 if (callsToWrite.size() >= maxQueueSize) { 153 throw new IOException("Can't add " + call.toShortString() 154 + " to the write queue. callsToWrite.size()=" + callsToWrite.size()); 155 } 156 callsToWrite.offer(call); 157 BlockingRpcConnection.this.notifyAll(); 158 } 159 160 public void remove(Call call) { 161 callsToWrite.remove(call); 162 // By removing the call from the expected call list, we make the list smaller, but 163 // it means as well that we don't know how many calls we cancelled. 164 calls.remove(call.id); 165 call.setException(new CallCancelledException(call.toShortString() + ", waitTime=" 166 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 167 + call.timeout)); 168 } 169 170 /** 171 * Reads the call from the queue, write them on the socket. 172 */ 173 @Override 174 public void run() { 175 synchronized (BlockingRpcConnection.this) { 176 while (!closed) { 177 if (callsToWrite.isEmpty()) { 178 // We should use another monitor object here for better performance since the read 179 // thread also uses ConnectionImpl.this. But this makes the locking schema more 180 // complicated, can do it later as an optimization. 181 try { 182 BlockingRpcConnection.this.wait(); 183 } catch (InterruptedException e) { 184 } 185 // check if we need to quit, so continue the main loop instead of fallback. 186 continue; 187 } 188 Call call = callsToWrite.poll(); 189 if (call.isDone()) { 190 continue; 191 } 192 try { 193 tracedWriteRequest(call); 194 } catch (IOException e) { 195 // exception here means the call has not been added to the pendingCalls yet, so we need 196 // to fail it by our own. 197 LOG.debug("call write error for {}", call.toShortString()); 198 call.setException(e); 199 closeConn(e); 200 } 201 } 202 } 203 } 204 205 /** 206 * Cleans the call not yet sent when we finish. 207 */ 208 public void cleanup(IOException e) { 209 IOException ie = new ConnectionClosingException( 210 "Connection to " + remoteId.address + " is closing."); 211 for (Call call : callsToWrite) { 212 call.setException(ie); 213 } 214 callsToWrite.clear(); 215 } 216 } 217 218 BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { 219 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, 220 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); 221 this.rpcClient = rpcClient; 222 if (remoteId.getAddress().isUnresolved()) { 223 throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); 224 } 225 226 this.connectionHeaderPreamble = getConnectionHeaderPreamble(); 227 ConnectionHeader header = getConnectionHeader(); 228 ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize()); 229 DataOutputStream dos = new DataOutputStream(baos); 230 dos.writeInt(header.getSerializedSize()); 231 header.writeTo(dos); 232 assert baos.size() == 4 + header.getSerializedSize(); 233 this.connectionHeaderWithLength = baos.getBuffer(); 234 235 UserGroupInformation ticket = remoteId.ticket.getUGI(); 236 this.threadName = "BRPC Connection (" + this.rpcClient.socketFactory.hashCode() + ") to " 237 + remoteId.getAddress().toString() 238 + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName())); 239 240 if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) { 241 callSender = new CallSender(threadName, this.rpcClient.conf); 242 callSender.start(); 243 } else { 244 callSender = null; 245 } 246 } 247 248 // protected for write UT. 249 protected void setupConnection() throws IOException { 250 short ioFailures = 0; 251 short timeoutFailures = 0; 252 while (true) { 253 try { 254 this.socket = this.rpcClient.socketFactory.createSocket(); 255 this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay()); 256 this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive); 257 if (this.rpcClient.localAddr != null) { 258 this.socket.bind(this.rpcClient.localAddr); 259 } 260 NetUtils.connect(this.socket, remoteId.getAddress(), this.rpcClient.connectTO); 261 this.socket.setSoTimeout(this.rpcClient.readTO); 262 return; 263 } catch (SocketTimeoutException toe) { 264 /* 265 * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries. 266 */ 267 if (LOG.isDebugEnabled()) { 268 LOG.debug("Received exception in connection setup.\n" + 269 StringUtils.stringifyException(toe)); 270 } 271 handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe); 272 } catch (IOException ie) { 273 if (LOG.isDebugEnabled()) { 274 LOG.debug("Received exception in connection setup.\n" + 275 StringUtils.stringifyException(ie)); 276 } 277 handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie); 278 } 279 } 280 } 281 282 /** 283 * Handle connection failures If the current number of retries is equal to the max number of 284 * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting 285 * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence 286 * the sleep is synchronized; the locks will be retained. 287 * @param curRetries current number of retries 288 * @param maxRetries max number of retries allowed 289 * @param ioe failure reason 290 * @throws IOException if max number of retries is reached 291 */ 292 private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) 293 throws IOException { 294 closeSocket(); 295 296 // throw the exception if the maximum number of retries is reached 297 if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) { 298 throw ioe; 299 } 300 301 // otherwise back off and retry 302 try { 303 Thread.sleep(this.rpcClient.failureSleep); 304 } catch (InterruptedException ie) { 305 ExceptionUtil.rethrowIfInterrupt(ie); 306 } 307 308 if (LOG.isInfoEnabled()) { 309 LOG.info("Retrying connect to server: " + remoteId.getAddress() + 310 " after sleeping " + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + 311 " time(s)."); 312 } 313 } 314 315 /* 316 * wait till someone signals us to start reading RPC response or it is idle too long, it is marked 317 * as to be closed, or the client is marked as not running. 318 * @return true if it is time to read a response; false otherwise. 319 */ 320 private synchronized boolean waitForWork() { 321 // beware of the concurrent access to the calls list: we can add calls, but as well 322 // remove them. 323 long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose; 324 for (;;) { 325 if (thread == null) { 326 return false; 327 } 328 if (!calls.isEmpty()) { 329 return true; 330 } 331 if (EnvironmentEdgeManager.currentTime() >= waitUntil) { 332 closeConn( 333 new IOException("idle connection closed with " + calls.size() + " pending request(s)")); 334 return false; 335 } 336 try { 337 wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000)); 338 } catch (InterruptedException e) { 339 } 340 } 341 } 342 343 @Override 344 public void run() { 345 if (LOG.isTraceEnabled()) { 346 LOG.trace(threadName + ": starting, connections " + this.rpcClient.connections.size()); 347 } 348 while (waitForWork()) { 349 readResponse(); 350 } 351 if (LOG.isTraceEnabled()) { 352 LOG.trace(threadName + ": stopped, connections " + this.rpcClient.connections.size()); 353 } 354 } 355 356 private void disposeSasl() { 357 if (saslRpcClient != null) { 358 saslRpcClient.dispose(); 359 saslRpcClient = null; 360 } 361 } 362 363 private boolean setupSaslConnection(final InputStream in2, final OutputStream out2) 364 throws IOException { 365 saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, 366 serverAddress, securityInfo, this.rpcClient.fallbackAllowed, 367 this.rpcClient.conf.get("hbase.rpc.protection", 368 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), 369 this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT)); 370 return saslRpcClient.saslConnect(in2, out2); 371 } 372 373 /** 374 * If multiple clients with the same principal try to connect to the same server at the same time, 375 * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to 376 * work around this, what is done is that the client backs off randomly and tries to initiate the 377 * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is 378 * attempted. 379 * <p> 380 * The retry logic is governed by the {@link SaslClientAuthenticationProvider#canRetry()} 381 * method. Some providers have the ability to obtain new credentials and then re-attempt to 382 * authenticate with HBase services. Other providers will continue to fail if they failed the 383 * first time -- for those, we want to fail-fast. 384 * </p> 385 */ 386 private void handleSaslConnectionFailure(final int currRetries, final int maxRetries, 387 final Exception ex, final UserGroupInformation user) 388 throws IOException, InterruptedException { 389 closeSocket(); 390 user.doAs(new PrivilegedExceptionAction<Object>() { 391 @Override 392 public Object run() throws IOException, InterruptedException { 393 // A provider which failed authentication, but doesn't have the ability to relogin with 394 // some external system (e.g. username/password, the password either works or it doesn't) 395 if (!provider.canRetry()) { 396 LOG.warn("Exception encountered while connecting to the server : " + ex); 397 if (ex instanceof RemoteException) { 398 throw (RemoteException) ex; 399 } 400 if (ex instanceof SaslException) { 401 String msg = "SASL authentication failed." 402 + " The most likely cause is missing or invalid credentials."; 403 throw new RuntimeException(msg, ex); 404 } 405 throw new IOException(ex); 406 } 407 408 // Other providers, like kerberos, could request a new ticket from a keytab. Let 409 // them try again. 410 if (currRetries < maxRetries) { 411 LOG.debug("Exception encountered while connecting to the server", ex); 412 413 // Invoke the provider to perform the relogin 414 provider.relogin(); 415 416 // Get rid of any old state on the SaslClient 417 disposeSasl(); 418 419 // have granularity of milliseconds 420 // we are sleeping with the Connection lock held but since this 421 // connection instance is being used for connecting to the server 422 // in question, it is okay 423 Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1); 424 return null; 425 } else { 426 String msg = "Failed to initiate connection for " 427 + UserGroupInformation.getLoginUser().getUserName() + " to " 428 + securityInfo.getServerPrincipal(); 429 throw new IOException(msg, ex); 430 } 431 } 432 }); 433 } 434 435 private void setupIOstreams() throws IOException { 436 if (socket != null) { 437 // The connection is already available. Perfect. 438 return; 439 } 440 441 if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) { 442 if (LOG.isDebugEnabled()) { 443 LOG.debug("Not trying to connect to " + remoteId.address 444 + " this server is in the failed servers list"); 445 } 446 throw new FailedServerException( 447 "This server is in the failed servers list: " + remoteId.address); 448 } 449 450 try { 451 if (LOG.isDebugEnabled()) { 452 LOG.debug("Connecting to " + remoteId.address); 453 } 454 455 short numRetries = 0; 456 int reloginMaxRetries = this.rpcClient.conf.getInt("hbase.security.relogin.maxretries", 5); 457 while (true) { 458 setupConnection(); 459 InputStream inStream = NetUtils.getInputStream(socket); 460 // This creates a socket with a write timeout. This timeout cannot be changed. 461 OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); 462 // Write out the preamble -- MAGIC, version, and auth to use. 463 writeConnectionHeaderPreamble(outStream); 464 if (useSasl) { 465 final InputStream in2 = inStream; 466 final OutputStream out2 = outStream; 467 UserGroupInformation ticket = provider.getRealUser(remoteId.ticket); 468 boolean continueSasl; 469 if (ticket == null) { 470 throw new FatalConnectionException("ticket/user is null"); 471 } 472 try { 473 continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() { 474 @Override 475 public Boolean run() throws IOException { 476 return setupSaslConnection(in2, out2); 477 } 478 }); 479 } catch (Exception ex) { 480 ExceptionUtil.rethrowIfInterrupt(ex); 481 handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, ticket); 482 continue; 483 } 484 if (continueSasl) { 485 // Sasl connect is successful. Let's set up Sasl i/o streams. 486 inStream = saslRpcClient.getInputStream(); 487 outStream = saslRpcClient.getOutputStream(); 488 } else { 489 // fall back to simple auth because server told us so. 490 // do not change authMethod and useSasl here, we should start from secure when 491 // reconnecting because regionserver may change its sasl config after restart. 492 } 493 } 494 this.in = new DataInputStream(new BufferedInputStream(inStream)); 495 this.out = new DataOutputStream(new BufferedOutputStream(outStream)); 496 // Now write out the connection header 497 writeConnectionHeader(); 498 // process the response from server for connection header if necessary 499 processResponseForConnectionHeader(); 500 501 break; 502 } 503 } catch (Throwable t) { 504 closeSocket(); 505 IOException e = ExceptionUtil.asInterrupt(t); 506 if (e == null) { 507 this.rpcClient.failedServers.addToFailedServers(remoteId.address, t); 508 if (t instanceof LinkageError) { 509 // probably the hbase hadoop version does not match the running hadoop version 510 e = new DoNotRetryIOException(t); 511 } else if (t instanceof IOException) { 512 e = (IOException) t; 513 } else { 514 e = new IOException("Could not set up IO Streams to " + remoteId.address, t); 515 } 516 } 517 throw e; 518 } 519 520 // start the receiver thread after the socket connection has been set up 521 thread = new Thread(this, threadName); 522 thread.setDaemon(true); 523 thread.start(); 524 } 525 526 /** 527 * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>} 528 */ 529 private void writeConnectionHeaderPreamble(OutputStream out) throws IOException { 530 out.write(connectionHeaderPreamble); 531 out.flush(); 532 } 533 534 /** 535 * Write the connection header. 536 */ 537 private void writeConnectionHeader() throws IOException { 538 boolean isCryptoAesEnable = false; 539 // check if Crypto AES is enabled 540 if (saslRpcClient != null) { 541 boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY. 542 getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP()); 543 isCryptoAesEnable = saslEncryptionEnabled && conf.getBoolean( 544 CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT); 545 } 546 547 // if Crypto AES is enabled, set transformation and negotiate with server 548 if (isCryptoAesEnable) { 549 waitingConnectionHeaderResponse = true; 550 } 551 this.out.write(connectionHeaderWithLength); 552 this.out.flush(); 553 } 554 555 private void processResponseForConnectionHeader() throws IOException { 556 // if no response excepted, return 557 if (!waitingConnectionHeaderResponse) return; 558 try { 559 // read the ConnectionHeaderResponse from server 560 int len = this.in.readInt(); 561 byte[] buff = new byte[len]; 562 int readSize = this.in.read(buff); 563 if (LOG.isDebugEnabled()) { 564 LOG.debug("Length of response for connection header:" + readSize); 565 } 566 567 RPCProtos.ConnectionHeaderResponse connectionHeaderResponse = 568 RPCProtos.ConnectionHeaderResponse.parseFrom(buff); 569 570 // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher 571 if (connectionHeaderResponse.hasCryptoCipherMeta()) { 572 negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta()); 573 } 574 waitingConnectionHeaderResponse = false; 575 } catch (SocketTimeoutException ste) { 576 LOG.error(HBaseMarkers.FATAL, "Can't get the connection header response for rpc timeout, " 577 + "please check if server has the correct configuration to support the additional " 578 + "function.", ste); 579 // timeout when waiting the connection header response, ignore the additional function 580 throw new IOException("Timeout while waiting connection header response", ste); 581 } 582 } 583 584 private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta) 585 throws IOException { 586 // initialize the Crypto AES with CryptoCipherMeta 587 saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf); 588 // reset the inputStream/outputStream for Crypto AES encryption 589 this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream())); 590 this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream())); 591 } 592 593 private void tracedWriteRequest(Call call) throws IOException { 594 try (TraceScope ignored = TraceUtil.createTrace("RpcClientImpl.tracedWriteRequest", 595 call.span)) { 596 writeRequest(call); 597 } 598 } 599 600 /** 601 * Initiates a call by sending the parameter to the remote server. Note: this is not called from 602 * the Connection thread, but by other threads. 603 * @see #readResponse() 604 */ 605 private void writeRequest(Call call) throws IOException { 606 ByteBuf cellBlock = null; 607 try { 608 cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, 609 call.cells, PooledByteBufAllocator.DEFAULT); 610 CellBlockMeta cellBlockMeta; 611 if (cellBlock != null) { 612 cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build(); 613 } else { 614 cellBlockMeta = null; 615 } 616 RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); 617 618 setupIOstreams(); 619 620 // Now we're going to write the call. We take the lock, then check that the connection 621 // is still valid, and, if so we do the write to the socket. If the write fails, we don't 622 // know where we stand, we have to close the connection. 623 if (Thread.interrupted()) { 624 throw new InterruptedIOException(); 625 } 626 627 calls.put(call.id, call); // We put first as we don't want the connection to become idle. 628 // from here, we do not throw any exception to upper layer as the call has been tracked in 629 // the pending calls map. 630 try { 631 call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); 632 } catch (Throwable t) { 633 if(LOG.isTraceEnabled()) { 634 LOG.trace("Error while writing {}", call.toShortString()); 635 } 636 IOException e = IPCUtil.toIOE(t); 637 closeConn(e); 638 return; 639 } 640 } finally { 641 if (cellBlock != null) { 642 cellBlock.release(); 643 } 644 } 645 notifyAll(); 646 } 647 648 /* 649 * Receive a response. Because only one receiver, so no synchronization on in. 650 */ 651 private void readResponse() { 652 Call call = null; 653 boolean expectedCall = false; 654 try { 655 // See HBaseServer.Call.setResponse for where we write out the response. 656 // Total size of the response. Unused. But have to read it in anyways. 657 int totalSize = in.readInt(); 658 659 // Read the header 660 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); 661 int id = responseHeader.getCallId(); 662 call = calls.remove(id); // call.done have to be set before leaving this method 663 expectedCall = (call != null && !call.isDone()); 664 if (!expectedCall) { 665 // So we got a response for which we have no corresponding 'call' here on the client-side. 666 // We probably timed out waiting, cleaned up all references, and now the server decides 667 // to return a response. There is nothing we can do w/ the response at this stage. Clean 668 // out the wire of the response so its out of the way and we can get other responses on 669 // this connection. 670 int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader); 671 int whatIsLeftToRead = totalSize - readSoFar; 672 IOUtils.skipFully(in, whatIsLeftToRead); 673 if (call != null) { 674 call.callStats.setResponseSizeBytes(totalSize); 675 call.callStats 676 .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); 677 } 678 return; 679 } 680 if (responseHeader.hasException()) { 681 ExceptionResponse exceptionResponse = responseHeader.getException(); 682 RemoteException re = createRemoteException(exceptionResponse); 683 call.setException(re); 684 call.callStats.setResponseSizeBytes(totalSize); 685 call.callStats 686 .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); 687 if (isFatalConnectionException(exceptionResponse)) { 688 synchronized (this) { 689 closeConn(re); 690 } 691 } 692 } else { 693 Message value = null; 694 if (call.responseDefaultType != null) { 695 Builder builder = call.responseDefaultType.newBuilderForType(); 696 ProtobufUtil.mergeDelimitedFrom(builder, in); 697 value = builder.build(); 698 } 699 CellScanner cellBlockScanner = null; 700 if (responseHeader.hasCellBlockMeta()) { 701 int size = responseHeader.getCellBlockMeta().getLength(); 702 byte[] cellBlock = new byte[size]; 703 IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); 704 cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec, 705 this.compressor, cellBlock); 706 } 707 call.setResponse(value, cellBlockScanner); 708 call.callStats.setResponseSizeBytes(totalSize); 709 call.callStats 710 .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); 711 } 712 } catch (IOException e) { 713 if (expectedCall) { 714 call.setException(e); 715 } 716 if (e instanceof SocketTimeoutException) { 717 // Clean up open calls but don't treat this as a fatal condition, 718 // since we expect certain responses to not make it by the specified 719 // {@link ConnectionId#rpcTimeout}. 720 if (LOG.isTraceEnabled()) { 721 LOG.trace("ignored", e); 722 } 723 } else { 724 synchronized (this) { 725 closeConn(e); 726 } 727 } 728 } 729 } 730 731 @Override 732 protected synchronized void callTimeout(Call call) { 733 // call sender 734 calls.remove(call.id); 735 } 736 737 // just close socket input and output. 738 private void closeSocket() { 739 IOUtils.closeStream(out); 740 IOUtils.closeStream(in); 741 IOUtils.closeSocket(socket); 742 out = null; 743 in = null; 744 socket = null; 745 } 746 747 // close socket, reader, and clean up all pending calls. 748 private void closeConn(IOException e) { 749 if (thread == null) { 750 return; 751 } 752 thread.interrupt(); 753 thread = null; 754 closeSocket(); 755 if (callSender != null) { 756 callSender.cleanup(e); 757 } 758 for (Call call : calls.values()) { 759 call.setException(e); 760 } 761 calls.clear(); 762 } 763 764 // release all resources, the connection will not be used any more. 765 @Override 766 public synchronized void shutdown() { 767 closed = true; 768 if (callSender != null) { 769 callSender.interrupt(); 770 } 771 closeConn(new IOException("connection to " + remoteId.address + " closed")); 772 } 773 774 @Override 775 public void cleanupConnection() { 776 // do nothing 777 } 778 779 @Override 780 public synchronized void sendRequest(final Call call, HBaseRpcController pcrc) 781 throws IOException { 782 pcrc.notifyOnCancel(new RpcCallback<Object>() { 783 784 @Override 785 public void run(Object parameter) { 786 setCancelled(call); 787 synchronized (BlockingRpcConnection.this) { 788 if (callSender != null) { 789 callSender.remove(call); 790 } else { 791 calls.remove(call.id); 792 } 793 } 794 } 795 }, new CancellationCallback() { 796 797 @Override 798 public void run(boolean cancelled) throws IOException { 799 if (cancelled) { 800 setCancelled(call); 801 return; 802 } 803 scheduleTimeoutTask(call); 804 if (callSender != null) { 805 callSender.sendCall(call); 806 } else { 807 tracedWriteRequest(call); 808 } 809 } 810 }); 811 } 812 813 @Override 814 public synchronized boolean isActive() { 815 return thread != null; 816 } 817}