001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader; 021import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.write; 023 024import io.opentelemetry.context.Scope; 025import java.io.BufferedInputStream; 026import java.io.BufferedOutputStream; 027import java.io.DataInputStream; 028import java.io.DataOutputStream; 029import java.io.IOException; 030import java.io.InputStream; 031import java.io.InterruptedIOException; 032import java.io.OutputStream; 033import java.net.InetSocketAddress; 034import java.net.Socket; 035import java.net.SocketTimeoutException; 036import java.security.PrivilegedExceptionAction; 037import java.util.ArrayDeque; 038import java.util.Locale; 039import java.util.Queue; 040import java.util.Set; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.ThreadLocalRandom; 044import java.util.concurrent.atomic.AtomicInteger; 045import javax.security.sasl.SaslException; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.hbase.DoNotRetryIOException; 048import org.apache.hadoop.hbase.client.ConnectionUtils; 049import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 050import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 051import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; 052import org.apache.hadoop.hbase.log.HBaseMarkers; 053import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; 054import org.apache.hadoop.hbase.security.SaslUtil; 055import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; 056import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.hadoop.hbase.util.ExceptionUtil; 059import org.apache.hadoop.io.IOUtils; 060import org.apache.hadoop.ipc.RemoteException; 061import org.apache.hadoop.net.NetUtils; 062import org.apache.hadoop.security.UserGroupInformation; 063import org.apache.hadoop.util.StringUtils; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 069import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 070import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; 071 072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 076 077/** 078 * Thread that reads responses and notifies callers. Each connection owns a socket connected to a 079 * remote address. Calls are multiplexed through this socket: responses may be delivered out of 080 * order. 081 */ 082@InterfaceAudience.Private 083class BlockingRpcConnection extends RpcConnection implements Runnable { 084 085 private static final Logger LOG = LoggerFactory.getLogger(BlockingRpcConnection.class); 086 087 private final BlockingRpcClient rpcClient; 088 089 private final String threadName; 090 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 091 justification = "We are always under lock actually") 092 private Thread thread; 093 094 // Used for ensuring two reader threads don't run over each other. Should only be used 095 // in reader thread run() method, to avoid deadlocks with synchronization on BlockingRpcConnection 096 private final Object readerThreadLock = new Object(); 097 098 // Used to suffix the threadName in a way that we can differentiate them in logs/thread dumps. 099 private final AtomicInteger attempts = new AtomicInteger(); 100 101 // connected socket. protected for writing UT. 102 protected Socket socket = null; 103 private DataInputStream in; 104 private DataOutputStream out; 105 106 private HBaseSaslRpcClient saslRpcClient; 107 108 // currently active calls 109 private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>(); 110 111 private final CallSender callSender; 112 113 private boolean closed = false; 114 115 private byte[] connectionHeaderPreamble; 116 117 private byte[] connectionHeaderWithLength; 118 119 private boolean waitingConnectionHeaderResponse = false; 120 121 /** 122 * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a 123 * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to 124 * use a different thread for writing. This way, on interruptions, we either cancel the writes or 125 * ignore the answer if the write is already done, but we don't stop the write in the middle. This 126 * adds a thread per region server in the client, so it's kept as an option. 127 * <p> 128 * The implementation is simple: the client threads adds their call to the queue, and then wait 129 * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On 130 * interruption, the client cancels its call. The CallSender checks that the call has not been 131 * canceled before writing it. 132 * </p> 133 * When the connection closes, all the calls not yet sent are dismissed. The client thread is 134 * notified with an appropriate exception, as if the call was already sent but the answer not yet 135 * received. 136 * </p> 137 */ 138 private class CallSender extends Thread { 139 140 private final Queue<Call> callsToWrite; 141 142 private final int maxQueueSize; 143 144 public CallSender(String name, Configuration conf) { 145 int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000); 146 callsToWrite = new ArrayDeque<>(queueSize); 147 this.maxQueueSize = queueSize; 148 setDaemon(true); 149 setName(name + " - writer"); 150 } 151 152 public void sendCall(final Call call) throws IOException { 153 if (callsToWrite.size() >= maxQueueSize) { 154 throw new IOException("Can't add " + call.toShortString() 155 + " to the write queue. callsToWrite.size()=" + callsToWrite.size()); 156 } 157 callsToWrite.offer(call); 158 BlockingRpcConnection.this.notifyAll(); 159 } 160 161 public void remove(Call call) { 162 callsToWrite.remove(call); 163 // By removing the call from the expected call list, we make the list smaller, but 164 // it means as well that we don't know how many calls we cancelled. 165 calls.remove(call.id); 166 call.setException(new CallCancelledException(call.toShortString() + ", waitTime=" 167 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 168 + call.timeout)); 169 } 170 171 /** 172 * Reads the call from the queue, write them on the socket. 173 */ 174 @Override 175 public void run() { 176 synchronized (BlockingRpcConnection.this) { 177 while (!closed) { 178 if (callsToWrite.isEmpty()) { 179 // We should use another monitor object here for better performance since the read 180 // thread also uses ConnectionImpl.this. But this makes the locking schema more 181 // complicated, can do it later as an optimization. 182 try { 183 BlockingRpcConnection.this.wait(); 184 } catch (InterruptedException e) { 185 // Restore interrupt status 186 Thread.currentThread().interrupt(); 187 } 188 // check if we need to quit, so continue the main loop instead of fallback. 189 continue; 190 } 191 Call call = callsToWrite.poll(); 192 if (call.isDone()) { 193 continue; 194 } 195 try (Scope scope = call.span.makeCurrent()) { 196 writeRequest(call); 197 } catch (IOException e) { 198 // exception here means the call has not been added to the pendingCalls yet, so we need 199 // to fail it by our own. 200 LOG.debug("call write error for {}", call.toShortString()); 201 call.setException(e); 202 closeConn(e); 203 } 204 } 205 } 206 } 207 208 /** 209 * Cleans the call not yet sent when we finish. 210 */ 211 public void cleanup(IOException e) { 212 IOException ie = 213 new ConnectionClosingException("Connection to " + remoteId.getAddress() + " is closing."); 214 for (Call call : callsToWrite) { 215 call.setException(ie); 216 } 217 callsToWrite.clear(); 218 } 219 } 220 221 BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { 222 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, 223 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, 224 rpcClient.cellBlockBuilder, rpcClient.metrics, rpcClient.connectionAttributes); 225 this.rpcClient = rpcClient; 226 this.connectionHeaderPreamble = getConnectionHeaderPreamble(); 227 ConnectionHeader header = getConnectionHeader(); 228 ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize()); 229 DataOutputStream dos = new DataOutputStream(baos); 230 dos.writeInt(header.getSerializedSize()); 231 header.writeTo(dos); 232 assert baos.size() == 4 + header.getSerializedSize(); 233 this.connectionHeaderWithLength = baos.getBuffer(); 234 235 UserGroupInformation ticket = remoteId.ticket.getUGI(); 236 this.threadName = "BRPC Connection (" + this.rpcClient.socketFactory.hashCode() + ") to " 237 + remoteId.getAddress().toString() 238 + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName())); 239 240 if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) { 241 callSender = new CallSender(threadName, this.rpcClient.conf); 242 callSender.start(); 243 } else { 244 callSender = null; 245 } 246 } 247 248 // protected for write UT. 249 protected void setupConnection() throws IOException { 250 short ioFailures = 0; 251 short timeoutFailures = 0; 252 while (true) { 253 try { 254 this.socket = this.rpcClient.socketFactory.createSocket(); 255 this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay()); 256 this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive); 257 if (this.rpcClient.localAddr != null) { 258 this.socket.bind(this.rpcClient.localAddr); 259 } 260 InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); 261 NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO); 262 this.socket.setSoTimeout(this.rpcClient.readTO); 263 return; 264 } catch (SocketTimeoutException toe) { 265 /* 266 * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries. 267 */ 268 if (LOG.isDebugEnabled()) { 269 LOG.debug( 270 "Received exception in connection setup.\n" + StringUtils.stringifyException(toe)); 271 } 272 handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe); 273 } catch (IOException ie) { 274 if (LOG.isDebugEnabled()) { 275 LOG.debug( 276 "Received exception in connection setup.\n" + StringUtils.stringifyException(ie)); 277 } 278 handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie); 279 } 280 } 281 } 282 283 /** 284 * Handle connection failures If the current number of retries is equal to the max number of 285 * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting 286 * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence 287 * the sleep is synchronized; the locks will be retained. 288 * @param curRetries current number of retries 289 * @param maxRetries max number of retries allowed 290 * @param ioe failure reason 291 * @throws IOException if max number of retries is reached 292 */ 293 private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) 294 throws IOException { 295 closeSocket(); 296 297 // throw the exception if the maximum number of retries is reached 298 if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) { 299 throw ioe; 300 } 301 302 // otherwise back off and retry 303 try { 304 Thread.sleep(this.rpcClient.failureSleep); 305 } catch (InterruptedException ie) { 306 ExceptionUtil.rethrowIfInterrupt(ie); 307 } 308 309 if (LOG.isInfoEnabled()) { 310 LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping " 311 + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s)."); 312 } 313 } 314 315 /* 316 * wait till someone signals us to start reading RPC response or it is idle too long, it is marked 317 * as to be closed, or the client is marked as not running. 318 * @return true if it is time to read a response; false otherwise. 319 */ 320 private synchronized boolean waitForWork() { 321 // beware of the concurrent access to the calls list: we can add calls, but as well 322 // remove them. 323 long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose; 324 for (;;) { 325 if (thread == null) { 326 return false; 327 } 328 329 // If closeConn is called while we are in the readResponse method, it's possible that a new 330 // call to setupIOStreams comes in and creates a new value for "thread" before readResponse 331 // finishes. Once readResponse finishes, it will come in here and thread will be non-null 332 // above, but pointing at a new thread. In that case, we should end to avoid a situation 333 // where two threads are forever competing for the same socket. 334 if (!isCurrentThreadExpected()) { 335 LOG.debug("Thread replaced by new connection thread. Ending waitForWork loop."); 336 return false; 337 } 338 339 if (!calls.isEmpty()) { 340 return true; 341 } 342 if (EnvironmentEdgeManager.currentTime() >= waitUntil) { 343 closeConn( 344 new IOException("idle connection closed with " + calls.size() + " pending request(s)")); 345 return false; 346 } 347 try { 348 wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000)); 349 } catch (InterruptedException e) { 350 // Restore interrupt status 351 Thread.currentThread().interrupt(); 352 353 String msg = "Interrupted while waiting for work"; 354 355 // If we were interrupted by closeConn, it would have set thread to null. 356 // We are synchronized here and if we somehow got interrupted without setting thread to 357 // null, we want to make sure the connection is closed since the read thread would be dead. 358 // Rather than do a null check here, we check if the current thread is the expected thread. 359 // This guards against the case where a call to setupIOStreams got the synchronized lock 360 // first after closeConn, thus changing the thread to a new thread. 361 if (isCurrentThreadExpected()) { 362 LOG.debug(msg + ", closing connection"); 363 closeConn(new InterruptedIOException(msg)); 364 } else { 365 LOG.debug(msg); 366 } 367 368 return false; 369 } 370 } 371 } 372 373 @Override 374 public void run() { 375 if (LOG.isTraceEnabled()) { 376 LOG.trace("starting"); 377 } 378 379 // We have a synchronization here because it's possible in error scenarios for a new 380 // thread to be started while readResponse is still reading on the socket. We don't want 381 // two threads to be reading from the same socket/inputstream. 382 // The below calls can synchronize on "BlockingRpcConnection.this". 383 // We should not synchronize on readerThreadLock anywhere else, to avoid deadlocks 384 synchronized (readerThreadLock) { 385 if (LOG.isTraceEnabled()) { 386 LOG.trace("started"); 387 } 388 while (waitForWork()) { 389 readResponse(); 390 } 391 } 392 if (LOG.isTraceEnabled()) { 393 LOG.trace("stopped"); 394 } 395 } 396 397 private void disposeSasl() { 398 if (saslRpcClient != null) { 399 saslRpcClient.dispose(); 400 saslRpcClient = null; 401 } 402 } 403 404 private boolean setupSaslConnection(final InputStream in2, final OutputStream out2, 405 String serverPrincipal) throws IOException { 406 if (this.metrics != null) { 407 this.metrics.incrNsLookups(); 408 } 409 saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, 410 socket.getInetAddress(), serverPrincipal, this.rpcClient.fallbackAllowed, 411 this.rpcClient.conf.get("hbase.rpc.protection", 412 QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), 413 this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT)); 414 return saslRpcClient.saslConnect(in2, out2); 415 } 416 417 /** 418 * If multiple clients with the same principal try to connect to the same server at the same time, 419 * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to 420 * work around this, what is done is that the client backs off randomly and tries to initiate the 421 * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is 422 * attempted. 423 * <p> 424 * The retry logic is governed by the {@link SaslClientAuthenticationProvider#canRetry()} method. 425 * Some providers have the ability to obtain new credentials and then re-attempt to authenticate 426 * with HBase services. Other providers will continue to fail if they failed the first time -- for 427 * those, we want to fail-fast. 428 * </p> 429 */ 430 private void handleSaslConnectionFailure(final int currRetries, final int maxRetries, 431 final Exception ex, final UserGroupInformation user, final String serverPrincipal) 432 throws IOException, InterruptedException { 433 closeSocket(); 434 user.doAs(new PrivilegedExceptionAction<Object>() { 435 @Override 436 public Object run() throws IOException, InterruptedException { 437 // A provider which failed authentication, but doesn't have the ability to relogin with 438 // some external system (e.g. username/password, the password either works or it doesn't) 439 if (!provider.canRetry()) { 440 LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(), 441 ex); 442 if (ex instanceof RemoteException) { 443 throw (RemoteException) ex; 444 } 445 if (ex instanceof SaslException) { 446 String msg = "SASL authentication failed." 447 + " The most likely cause is missing or invalid credentials."; 448 throw new RuntimeException(msg, ex); 449 } 450 throw new IOException(ex); 451 } 452 453 // Other providers, like kerberos, could request a new ticket from a keytab. Let 454 // them try again. 455 if (currRetries < maxRetries) { 456 LOG.debug("Exception encountered while connecting to the server " + remoteId.getAddress(), 457 ex); 458 459 // Invoke the provider to perform the relogin 460 provider.relogin(); 461 462 // Get rid of any old state on the SaslClient 463 disposeSasl(); 464 465 // have granularity of milliseconds 466 // we are sleeping with the Connection lock held but since this 467 // connection instance is being used for connecting to the server 468 // in question, it is okay 469 Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1); 470 return null; 471 } else { 472 String msg = "Failed to initiate connection for " 473 + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; 474 throw new IOException(msg, ex); 475 } 476 } 477 }); 478 } 479 480 private void getConnectionRegistry(InputStream inStream, OutputStream outStream, 481 Call connectionRegistryCall) throws IOException { 482 outStream.write(RpcClient.REGISTRY_PREAMBLE_HEADER); 483 readResponse(new DataInputStream(inStream), calls, connectionRegistryCall, remoteExc -> { 484 synchronized (this) { 485 closeConn(remoteExc); 486 } 487 }); 488 } 489 490 private void createStreams(InputStream inStream, OutputStream outStream) { 491 this.in = new DataInputStream(new BufferedInputStream(inStream)); 492 this.out = new DataOutputStream(new BufferedOutputStream(outStream)); 493 } 494 495 // choose the server principal to use 496 private String chooseServerPrincipal(InputStream inStream, OutputStream outStream) 497 throws IOException { 498 Set<String> serverPrincipals = getServerPrincipals(); 499 if (serverPrincipals.size() == 1) { 500 return serverPrincipals.iterator().next(); 501 } 502 // this means we use kerberos authentication and there are multiple server principal candidates, 503 // in this way we need to send a special preamble header to get server principal from server 504 Call securityPreambleCall = createSecurityPreambleCall(r -> { 505 }); 506 outStream.write(RpcClient.SECURITY_PREAMBLE_HEADER); 507 readResponse(new DataInputStream(inStream), calls, securityPreambleCall, remoteExc -> { 508 synchronized (this) { 509 closeConn(remoteExc); 510 } 511 }); 512 if (securityPreambleCall.error != null) { 513 LOG.debug("Error when trying to do a security preamble call to {}", remoteId.address, 514 securityPreambleCall.error); 515 if (ConnectionUtils.isUnexpectedPreambleHeaderException(securityPreambleCall.error)) { 516 // this means we are connecting to an old server which does not support the security 517 // preamble call, so we should fallback to randomly select a principal to use 518 // TODO: find a way to reconnect without failing all the pending calls, for now, when we 519 // reach here, shutdown should have already been scheduled 520 throw securityPreambleCall.error; 521 } 522 if (IPCUtil.isSecurityNotEnabledException(securityPreambleCall.error)) { 523 // server tells us security is not enabled, then we should check whether fallback to 524 // simple is allowed, if so we just go without security, otherwise we should fail the 525 // negotiation immediately 526 if (rpcClient.fallbackAllowed) { 527 // TODO: just change the preamble and skip the fallback to simple logic, for now, just 528 // select the first principal can finish the connection setup, but waste one client 529 // message 530 return serverPrincipals.iterator().next(); 531 } else { 532 throw new FallbackDisallowedException(); 533 } 534 } 535 return randomSelect(serverPrincipals); 536 } 537 return chooseServerPrincipal(serverPrincipals, securityPreambleCall); 538 } 539 540 private void setupIOstreams(Call connectionRegistryCall) throws IOException { 541 if (socket != null) { 542 // The connection is already available. Perfect. 543 return; 544 } 545 546 if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) { 547 if (LOG.isDebugEnabled()) { 548 LOG.debug("Not trying to connect to " + remoteId.getAddress() 549 + " this server is in the failed servers list"); 550 } 551 throw new FailedServerException( 552 "This server is in the failed servers list: " + remoteId.getAddress()); 553 } 554 555 try { 556 if (LOG.isDebugEnabled()) { 557 LOG.debug("Connecting to " + remoteId.getAddress()); 558 } 559 560 short numRetries = 0; 561 int reloginMaxRetries = this.rpcClient.conf.getInt("hbase.security.relogin.maxretries", 5); 562 while (true) { 563 setupConnection(); 564 InputStream inStream = NetUtils.getInputStream(socket); 565 // This creates a socket with a write timeout. This timeout cannot be changed. 566 OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); 567 if (connectionRegistryCall != null) { 568 getConnectionRegistry(inStream, outStream, connectionRegistryCall); 569 closeSocket(); 570 return; 571 } 572 573 if (useSasl) { 574 UserGroupInformation ticket = provider.getRealUser(remoteId.ticket); 575 boolean continueSasl; 576 if (ticket == null) { 577 throw new FatalConnectionException("ticket/user is null"); 578 } 579 String serverPrincipal = chooseServerPrincipal(inStream, outStream); 580 // Write out the preamble -- MAGIC, version, and auth to use. 581 writeConnectionHeaderPreamble(outStream); 582 try { 583 final InputStream in2 = inStream; 584 final OutputStream out2 = outStream; 585 continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() { 586 @Override 587 public Boolean run() throws IOException { 588 return setupSaslConnection(in2, out2, serverPrincipal); 589 } 590 }); 591 } catch (Exception ex) { 592 ExceptionUtil.rethrowIfInterrupt(ex); 593 saslNegotiationDone(serverPrincipal, false); 594 handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, ticket, 595 serverPrincipal); 596 continue; 597 } 598 saslNegotiationDone(serverPrincipal, true); 599 if (continueSasl) { 600 // Sasl connect is successful. Let's set up Sasl i/o streams. 601 inStream = saslRpcClient.getInputStream(); 602 outStream = saslRpcClient.getOutputStream(); 603 } else { 604 // fall back to simple auth because server told us so. 605 // do not change authMethod and useSasl here, we should start from secure when 606 // reconnecting because regionserver may change its sasl config after restart. 607 saslRpcClient = null; 608 } 609 } else { 610 // Write out the preamble -- MAGIC, version, and auth to use. 611 writeConnectionHeaderPreamble(outStream); 612 } 613 createStreams(inStream, outStream); 614 // Now write out the connection header 615 writeConnectionHeader(); 616 // process the response from server for connection header if necessary 617 processResponseForConnectionHeader(); 618 break; 619 } 620 } catch (Throwable t) { 621 closeSocket(); 622 IOException e = ExceptionUtil.asInterrupt(t); 623 if (e == null) { 624 this.rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), t); 625 if (t instanceof LinkageError) { 626 // probably the hbase hadoop version does not match the running hadoop version 627 e = new DoNotRetryIOException(t); 628 } else if (t instanceof IOException) { 629 e = (IOException) t; 630 } else { 631 e = new IOException("Could not set up IO Streams to " + remoteId.getAddress(), t); 632 } 633 } 634 throw e; 635 } 636 637 // start the receiver thread after the socket connection has been set up 638 thread = new Thread(this, threadName + " (attempt: " + attempts.incrementAndGet() + ")"); 639 thread.setDaemon(true); 640 thread.start(); 641 } 642 643 /** 644 * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>} 645 */ 646 private void writeConnectionHeaderPreamble(OutputStream out) throws IOException { 647 out.write(connectionHeaderPreamble); 648 out.flush(); 649 } 650 651 /** 652 * Write the connection header. 653 */ 654 private void writeConnectionHeader() throws IOException { 655 boolean isCryptoAesEnable = false; 656 // check if Crypto AES is enabled 657 if (saslRpcClient != null) { 658 boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop() 659 .equalsIgnoreCase(saslRpcClient.getSaslQOP()); 660 isCryptoAesEnable = saslEncryptionEnabled 661 && conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT); 662 } 663 664 // if Crypto AES is enabled, set transformation and negotiate with server 665 if (isCryptoAesEnable) { 666 waitingConnectionHeaderResponse = true; 667 } 668 this.out.write(connectionHeaderWithLength); 669 this.out.flush(); 670 } 671 672 private void processResponseForConnectionHeader() throws IOException { 673 // if no response excepted, return 674 if (!waitingConnectionHeaderResponse) return; 675 try { 676 // read the ConnectionHeaderResponse from server 677 int len = this.in.readInt(); 678 byte[] buff = new byte[len]; 679 int readSize = this.in.read(buff); 680 if (LOG.isDebugEnabled()) { 681 LOG.debug("Length of response for connection header:" + readSize); 682 } 683 684 RPCProtos.ConnectionHeaderResponse connectionHeaderResponse = 685 RPCProtos.ConnectionHeaderResponse.parseFrom(buff); 686 687 // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher 688 if (connectionHeaderResponse.hasCryptoCipherMeta()) { 689 negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta()); 690 } 691 waitingConnectionHeaderResponse = false; 692 } catch (SocketTimeoutException ste) { 693 LOG.error(HBaseMarkers.FATAL, 694 "Can't get the connection header response for rpc timeout, " 695 + "please check if server has the correct configuration to support the additional " 696 + "function.", 697 ste); 698 // timeout when waiting the connection header response, ignore the additional function 699 throw new IOException("Timeout while waiting connection header response", ste); 700 } 701 } 702 703 private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta) throws IOException { 704 // initialize the Crypto AES with CryptoCipherMeta 705 saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf); 706 // reset the inputStream/outputStream for Crypto AES encryption 707 this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream())); 708 this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream())); 709 } 710 711 /** 712 * Initiates a call by sending the parameter to the remote server. Note: this is not called from 713 * the Connection thread, but by other threads. 714 * @see #readResponse() 715 */ 716 private void writeRequest(Call call) throws IOException { 717 ByteBuf cellBlock = null; 718 try { 719 cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, 720 call.cells, PooledByteBufAllocator.DEFAULT); 721 CellBlockMeta cellBlockMeta; 722 if (cellBlock != null) { 723 cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build(); 724 } else { 725 cellBlockMeta = null; 726 } 727 RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); 728 if (call.isConnectionRegistryCall()) { 729 setupIOstreams(call); 730 return; 731 } 732 setupIOstreams(null); 733 734 // Now we're going to write the call. We take the lock, then check that the connection 735 // is still valid, and, if so we do the write to the socket. If the write fails, we don't 736 // know where we stand, we have to close the connection. 737 if (Thread.interrupted()) { 738 throw new InterruptedIOException(); 739 } 740 741 calls.put(call.id, call); // We put first as we don't want the connection to become idle. 742 // from here, we do not throw any exception to upper layer as the call has been tracked in 743 // the pending calls map. 744 try { 745 call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); 746 } catch (Throwable t) { 747 if (LOG.isTraceEnabled()) { 748 LOG.trace("Error while writing {}", call.toShortString(), t); 749 } 750 IOException e = IPCUtil.toIOE(t); 751 closeConn(e); 752 return; 753 } 754 } finally { 755 if (cellBlock != null) { 756 cellBlock.release(); 757 } 758 } 759 notifyAll(); 760 } 761 762 /* 763 * Receive a response. Because only one receiver, so no synchronization on in. 764 */ 765 private void readResponse() { 766 try { 767 readResponse(in, calls, null, remoteExc -> { 768 synchronized (this) { 769 closeConn(remoteExc); 770 } 771 }); 772 } catch (IOException e) { 773 if (e instanceof SocketTimeoutException) { 774 // Clean up open calls but don't treat this as a fatal condition, 775 // since we expect certain responses to not make it by the specified 776 // {@link ConnectionId#rpcTimeout}. 777 if (LOG.isTraceEnabled()) { 778 LOG.trace("ignored", e); 779 } 780 } else { 781 synchronized (this) { 782 // The exception we received may have been caused by another thread closing 783 // this connection. It's possible that before getting to this point, a new connection was 784 // created. In that case, it doesn't help and can actually hurt to close again here. 785 if (isCurrentThreadExpected()) { 786 LOG.debug("Closing connection after error", e); 787 closeConn(e); 788 } 789 } 790 } 791 } 792 } 793 794 /** 795 * For use in the reader thread, tests if the current reader thread is the one expected to be 796 * running. When closeConn is called, the reader thread is expected to end. setupIOStreams then 797 * creates a new thread and updates the thread pointer. At that point, the new thread should be 798 * the only one running. We use this method to guard against cases where the old thread may be 799 * erroneously running or closing the connection in error states. 800 */ 801 private boolean isCurrentThreadExpected() { 802 return thread == Thread.currentThread(); 803 } 804 805 @Override 806 protected synchronized void callTimeout(Call call) { 807 // call sender 808 calls.remove(call.id); 809 } 810 811 // just close socket input and output. 812 private void closeSocket() { 813 IOUtils.closeStream(out); 814 IOUtils.closeStream(in); 815 IOUtils.closeSocket(socket); 816 out = null; 817 in = null; 818 socket = null; 819 } 820 821 // close socket, reader, and clean up all pending calls. 822 private void closeConn(IOException e) { 823 if (thread == null) { 824 return; 825 } 826 thread.interrupt(); 827 thread = null; 828 closeSocket(); 829 if (callSender != null) { 830 callSender.cleanup(e); 831 } 832 for (Call call : calls.values()) { 833 call.setException(e); 834 } 835 calls.clear(); 836 } 837 838 // release all resources, the connection will not be used any more. 839 @Override 840 public synchronized void shutdown() { 841 closed = true; 842 if (callSender != null) { 843 callSender.interrupt(); 844 } 845 closeConn(new IOException("connection to " + remoteId.getAddress() + " closed")); 846 } 847 848 @Override 849 public void cleanupConnection() { 850 // do nothing 851 } 852 853 @Override 854 public synchronized void sendRequest(final Call call, HBaseRpcController pcrc) 855 throws IOException { 856 pcrc.notifyOnCancel(new RpcCallback<Object>() { 857 858 @Override 859 public void run(Object parameter) { 860 setCancelled(call); 861 synchronized (BlockingRpcConnection.this) { 862 if (callSender != null) { 863 callSender.remove(call); 864 } else { 865 calls.remove(call.id); 866 } 867 } 868 } 869 }, new CancellationCallback() { 870 871 @Override 872 public void run(boolean cancelled) throws IOException { 873 if (cancelled) { 874 setCancelled(call); 875 return; 876 } 877 scheduleTimeoutTask(call); 878 if (callSender != null) { 879 callSender.sendCall(call); 880 } else { 881 // this is in the same thread with the caller so do not need to attach the trace context 882 // again. 883 writeRequest(call); 884 } 885 } 886 }); 887 } 888 889 @Override 890 public synchronized boolean isActive() { 891 return thread != null; 892 } 893}