001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader;
021import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
023
024import io.opentelemetry.context.Scope;
025import java.io.BufferedInputStream;
026import java.io.BufferedOutputStream;
027import java.io.DataInputStream;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.io.InputStream;
031import java.io.InterruptedIOException;
032import java.io.OutputStream;
033import java.net.InetSocketAddress;
034import java.net.Socket;
035import java.net.SocketTimeoutException;
036import java.security.PrivilegedExceptionAction;
037import java.util.ArrayDeque;
038import java.util.Locale;
039import java.util.Queue;
040import java.util.Set;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ConcurrentMap;
043import java.util.concurrent.ThreadLocalRandom;
044import javax.security.sasl.SaslException;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.hbase.DoNotRetryIOException;
047import org.apache.hadoop.hbase.client.ConnectionUtils;
048import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
049import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
050import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
051import org.apache.hadoop.hbase.log.HBaseMarkers;
052import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
053import org.apache.hadoop.hbase.security.SaslUtil;
054import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
055import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.util.ExceptionUtil;
058import org.apache.hadoop.io.IOUtils;
059import org.apache.hadoop.ipc.RemoteException;
060import org.apache.hadoop.net.NetUtils;
061import org.apache.hadoop.security.UserGroupInformation;
062import org.apache.hadoop.util.StringUtils;
063import org.apache.yetus.audience.InterfaceAudience;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
068import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
069import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
070
071import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
075
076/**
077 * Thread that reads responses and notifies callers. Each connection owns a socket connected to a
078 * remote address. Calls are multiplexed through this socket: responses may be delivered out of
079 * order.
080 */
081@InterfaceAudience.Private
082class BlockingRpcConnection extends RpcConnection implements Runnable {
083
084  private static final Logger LOG = LoggerFactory.getLogger(BlockingRpcConnection.class);
085
086  private final BlockingRpcClient rpcClient;
087
088  private final String threadName;
089  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
090      justification = "We are always under lock actually")
091  private Thread thread;
092
093  // connected socket. protected for writing UT.
094  protected Socket socket = null;
095  private DataInputStream in;
096  private DataOutputStream out;
097
098  private HBaseSaslRpcClient saslRpcClient;
099
100  // currently active calls
101  private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>();
102
103  private final CallSender callSender;
104
105  private boolean closed = false;
106
107  private byte[] connectionHeaderPreamble;
108
109  private byte[] connectionHeaderWithLength;
110
111  private boolean waitingConnectionHeaderResponse = false;
112
113  /**
114   * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a
115   * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to
116   * use a different thread for writing. This way, on interruptions, we either cancel the writes or
117   * ignore the answer if the write is already done, but we don't stop the write in the middle. This
118   * adds a thread per region server in the client, so it's kept as an option.
119   * <p>
120   * The implementation is simple: the client threads adds their call to the queue, and then wait
121   * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On
122   * interruption, the client cancels its call. The CallSender checks that the call has not been
123   * canceled before writing it.
124   * </p>
125   * When the connection closes, all the calls not yet sent are dismissed. The client thread is
126   * notified with an appropriate exception, as if the call was already sent but the answer not yet
127   * received.
128   * </p>
129   */
130  private class CallSender extends Thread {
131
132    private final Queue<Call> callsToWrite;
133
134    private final int maxQueueSize;
135
136    public CallSender(String name, Configuration conf) {
137      int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
138      callsToWrite = new ArrayDeque<>(queueSize);
139      this.maxQueueSize = queueSize;
140      setDaemon(true);
141      setName(name + " - writer");
142    }
143
144    public void sendCall(final Call call) throws IOException {
145      if (callsToWrite.size() >= maxQueueSize) {
146        throw new IOException("Can't add " + call.toShortString()
147          + " to the write queue. callsToWrite.size()=" + callsToWrite.size());
148      }
149      callsToWrite.offer(call);
150      BlockingRpcConnection.this.notifyAll();
151    }
152
153    public void remove(Call call) {
154      callsToWrite.remove(call);
155      // By removing the call from the expected call list, we make the list smaller, but
156      // it means as well that we don't know how many calls we cancelled.
157      calls.remove(call.id);
158      call.setException(new CallCancelledException(call.toShortString() + ", waitTime="
159        + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
160        + call.timeout));
161    }
162
163    /**
164     * Reads the call from the queue, write them on the socket.
165     */
166    @Override
167    public void run() {
168      synchronized (BlockingRpcConnection.this) {
169        while (!closed) {
170          if (callsToWrite.isEmpty()) {
171            // We should use another monitor object here for better performance since the read
172            // thread also uses ConnectionImpl.this. But this makes the locking schema more
173            // complicated, can do it later as an optimization.
174            try {
175              BlockingRpcConnection.this.wait();
176            } catch (InterruptedException e) {
177              // Restore interrupt status
178              Thread.currentThread().interrupt();
179            }
180            // check if we need to quit, so continue the main loop instead of fallback.
181            continue;
182          }
183          Call call = callsToWrite.poll();
184          if (call.isDone()) {
185            continue;
186          }
187          try (Scope scope = call.span.makeCurrent()) {
188            writeRequest(call);
189          } catch (IOException e) {
190            // exception here means the call has not been added to the pendingCalls yet, so we need
191            // to fail it by our own.
192            LOG.debug("call write error for {}", call.toShortString());
193            call.setException(e);
194            closeConn(e);
195          }
196        }
197      }
198    }
199
200    /**
201     * Cleans the call not yet sent when we finish.
202     */
203    public void cleanup(IOException e) {
204      IOException ie =
205        new ConnectionClosingException("Connection to " + remoteId.getAddress() + " is closing.");
206      for (Call call : callsToWrite) {
207        call.setException(ie);
208      }
209      callsToWrite.clear();
210    }
211  }
212
213  BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
214    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
215      rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.providers, rpcClient.codec,
216      rpcClient.compressor, rpcClient.cellBlockBuilder, rpcClient.metrics,
217      rpcClient.connectionAttributes);
218    this.rpcClient = rpcClient;
219    this.connectionHeaderPreamble = getConnectionHeaderPreamble();
220    ConnectionHeader header = getConnectionHeader();
221    ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize());
222    DataOutputStream dos = new DataOutputStream(baos);
223    dos.writeInt(header.getSerializedSize());
224    header.writeTo(dos);
225    assert baos.size() == 4 + header.getSerializedSize();
226    this.connectionHeaderWithLength = baos.getBuffer();
227
228    UserGroupInformation ticket = remoteId.ticket.getUGI();
229    this.threadName = "BRPC Connection (" + this.rpcClient.socketFactory.hashCode() + ") to "
230      + remoteId.getAddress().toString()
231      + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName()));
232
233    if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) {
234      callSender = new CallSender(threadName, this.rpcClient.conf);
235      callSender.start();
236    } else {
237      callSender = null;
238    }
239  }
240
241  // protected for write UT.
242  protected void setupConnection() throws IOException {
243    short ioFailures = 0;
244    short timeoutFailures = 0;
245    while (true) {
246      try {
247        this.socket = this.rpcClient.socketFactory.createSocket();
248        this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay());
249        this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive);
250        if (this.rpcClient.localAddr != null) {
251          this.socket.bind(this.rpcClient.localAddr);
252        }
253        InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
254        NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO);
255        this.socket.setSoTimeout(this.rpcClient.readTO);
256        return;
257      } catch (SocketTimeoutException toe) {
258        /*
259         * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries.
260         */
261        if (LOG.isDebugEnabled()) {
262          LOG.debug(
263            "Received exception in connection setup.\n" + StringUtils.stringifyException(toe));
264        }
265        handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe);
266      } catch (IOException ie) {
267        if (LOG.isDebugEnabled()) {
268          LOG.debug(
269            "Received exception in connection setup.\n" + StringUtils.stringifyException(ie));
270        }
271        handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie);
272      }
273    }
274  }
275
276  /**
277   * Handle connection failures If the current number of retries is equal to the max number of
278   * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting
279   * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence
280   * the sleep is synchronized; the locks will be retained.
281   * @param curRetries current number of retries
282   * @param maxRetries max number of retries allowed
283   * @param ioe        failure reason
284   * @throws IOException if max number of retries is reached
285   */
286  private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
287    throws IOException {
288    closeSocket();
289
290    // throw the exception if the maximum number of retries is reached
291    if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
292      throw ioe;
293    }
294
295    // otherwise back off and retry
296    try {
297      Thread.sleep(this.rpcClient.failureSleep);
298    } catch (InterruptedException ie) {
299      ExceptionUtil.rethrowIfInterrupt(ie);
300    }
301
302    if (LOG.isInfoEnabled()) {
303      LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping "
304        + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s).");
305    }
306  }
307
308  /*
309   * wait till someone signals us to start reading RPC response or it is idle too long, it is marked
310   * as to be closed, or the client is marked as not running.
311   * @return true if it is time to read a response; false otherwise.
312   */
313  private synchronized boolean waitForWork() {
314    // beware of the concurrent access to the calls list: we can add calls, but as well
315    // remove them.
316    long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose;
317    for (;;) {
318      if (thread == null) {
319        return false;
320      }
321      if (!calls.isEmpty()) {
322        return true;
323      }
324      if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
325        closeConn(
326          new IOException("idle connection closed with " + calls.size() + " pending request(s)"));
327        return false;
328      }
329      try {
330        wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
331      } catch (InterruptedException e) {
332        // Restore interrupt status
333        Thread.currentThread().interrupt();
334      }
335    }
336  }
337
338  @Override
339  public void run() {
340    if (LOG.isTraceEnabled()) {
341      LOG.trace(threadName + ": starting");
342    }
343    while (waitForWork()) {
344      readResponse();
345    }
346    if (LOG.isTraceEnabled()) {
347      LOG.trace(threadName + ": stopped");
348    }
349  }
350
351  private void disposeSasl() {
352    if (saslRpcClient != null) {
353      saslRpcClient.dispose();
354      saslRpcClient = null;
355    }
356  }
357
358  private boolean setupSaslConnection(final InputStream in2, final OutputStream out2,
359    String serverPrincipal) throws IOException {
360    if (this.metrics != null) {
361      this.metrics.incrNsLookups();
362    }
363    saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
364      socket.getInetAddress(), serverPrincipal, this.rpcClient.fallbackAllowed,
365      this.rpcClient.conf.get("hbase.rpc.protection",
366        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
367      this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
368    return saslRpcClient.saslConnect(in2, out2);
369  }
370
371  /**
372   * If multiple clients with the same principal try to connect to the same server at the same time,
373   * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
374   * work around this, what is done is that the client backs off randomly and tries to initiate the
375   * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
376   * attempted.
377   * <p>
378   * The retry logic is governed by the {@link SaslClientAuthenticationProvider#canRetry()} method.
379   * Some providers have the ability to obtain new credentials and then re-attempt to authenticate
380   * with HBase services. Other providers will continue to fail if they failed the first time -- for
381   * those, we want to fail-fast.
382   * </p>
383   */
384  private void handleSaslConnectionFailure(final int currRetries, final int maxRetries,
385    final Exception ex, final UserGroupInformation user, final String serverPrincipal)
386    throws IOException, InterruptedException {
387    closeSocket();
388    user.doAs(new PrivilegedExceptionAction<Object>() {
389      @Override
390      public Object run() throws IOException, InterruptedException {
391        // A provider which failed authentication, but doesn't have the ability to relogin with
392        // some external system (e.g. username/password, the password either works or it doesn't)
393        if (!provider.canRetry()) {
394          LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(),
395            ex);
396          if (ex instanceof RemoteException) {
397            throw (RemoteException) ex;
398          }
399          if (ex instanceof SaslException) {
400            String msg = "SASL authentication failed."
401              + " The most likely cause is missing or invalid credentials.";
402            throw new RuntimeException(msg, ex);
403          }
404          throw new IOException(ex);
405        }
406
407        // Other providers, like kerberos, could request a new ticket from a keytab. Let
408        // them try again.
409        if (currRetries < maxRetries) {
410          LOG.debug("Exception encountered while connecting to the server " + remoteId.getAddress(),
411            ex);
412
413          // Invoke the provider to perform the relogin
414          provider.relogin();
415
416          // Get rid of any old state on the SaslClient
417          disposeSasl();
418
419          // have granularity of milliseconds
420          // we are sleeping with the Connection lock held but since this
421          // connection instance is being used for connecting to the server
422          // in question, it is okay
423          Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1);
424          return null;
425        } else {
426          String msg = "Failed to initiate connection for "
427            + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
428          throw new IOException(msg, ex);
429        }
430      }
431    });
432  }
433
434  private void getConnectionRegistry(InputStream inStream, OutputStream outStream,
435    Call connectionRegistryCall) throws IOException {
436    outStream.write(RpcClient.REGISTRY_PREAMBLE_HEADER);
437    readResponse(new DataInputStream(inStream), calls, connectionRegistryCall, remoteExc -> {
438      synchronized (this) {
439        closeConn(remoteExc);
440      }
441    });
442  }
443
444  private void createStreams(InputStream inStream, OutputStream outStream) {
445    this.in = new DataInputStream(new BufferedInputStream(inStream));
446    this.out = new DataOutputStream(new BufferedOutputStream(outStream));
447  }
448
449  // choose the server principal to use
450  private String chooseServerPrincipal(InputStream inStream, OutputStream outStream)
451    throws IOException {
452    Set<String> serverPrincipals = getServerPrincipals();
453    if (serverPrincipals.size() == 1) {
454      return serverPrincipals.iterator().next();
455    }
456    // this means we use kerberos authentication and there are multiple server principal candidates,
457    // in this way we need to send a special preamble header to get server principal from server
458    Call securityPreambleCall = createSecurityPreambleCall(r -> {
459    });
460    outStream.write(RpcClient.SECURITY_PREAMBLE_HEADER);
461    readResponse(new DataInputStream(inStream), calls, securityPreambleCall, remoteExc -> {
462      synchronized (this) {
463        closeConn(remoteExc);
464      }
465    });
466    if (securityPreambleCall.error != null) {
467      LOG.debug("Error when trying to do a security preamble call to {}", remoteId.address,
468        securityPreambleCall.error);
469      if (ConnectionUtils.isUnexpectedPreambleHeaderException(securityPreambleCall.error)) {
470        // this means we are connecting to an old server which does not support the security
471        // preamble call, so we should fallback to randomly select a principal to use
472        // TODO: find a way to reconnect without failing all the pending calls, for now, when we
473        // reach here, shutdown should have already been scheduled
474        throw securityPreambleCall.error;
475      }
476      if (IPCUtil.isSecurityNotEnabledException(securityPreambleCall.error)) {
477        // server tells us security is not enabled, then we should check whether fallback to
478        // simple is allowed, if so we just go without security, otherwise we should fail the
479        // negotiation immediately
480        if (rpcClient.fallbackAllowed) {
481          // TODO: just change the preamble and skip the fallback to simple logic, for now, just
482          // select the first principal can finish the connection setup, but waste one client
483          // message
484          return serverPrincipals.iterator().next();
485        } else {
486          throw new FallbackDisallowedException();
487        }
488      }
489      return randomSelect(serverPrincipals);
490    }
491    return chooseServerPrincipal(serverPrincipals, securityPreambleCall);
492  }
493
494  private void setupIOstreams(Call connectionRegistryCall) throws IOException {
495    if (socket != null) {
496      // The connection is already available. Perfect.
497      return;
498    }
499
500    if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) {
501      if (LOG.isDebugEnabled()) {
502        LOG.debug("Not trying to connect to " + remoteId.getAddress()
503          + " this server is in the failed servers list");
504      }
505      throw new FailedServerException(
506        "This server is in the failed servers list: " + remoteId.getAddress());
507    }
508
509    try {
510      if (LOG.isDebugEnabled()) {
511        LOG.debug("Connecting to " + remoteId.getAddress());
512      }
513
514      short numRetries = 0;
515      int reloginMaxRetries = this.rpcClient.conf.getInt("hbase.security.relogin.maxretries", 5);
516      while (true) {
517        setupConnection();
518        InputStream inStream = NetUtils.getInputStream(socket);
519        // This creates a socket with a write timeout. This timeout cannot be changed.
520        OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
521        if (connectionRegistryCall != null) {
522          getConnectionRegistry(inStream, outStream, connectionRegistryCall);
523          closeSocket();
524          return;
525        }
526
527        if (useSasl) {
528          UserGroupInformation ticket = provider.getRealUser(remoteId.ticket);
529          boolean continueSasl;
530          if (ticket == null) {
531            throw new FatalConnectionException("ticket/user is null");
532          }
533          String serverPrincipal = chooseServerPrincipal(inStream, outStream);
534          // Write out the preamble -- MAGIC, version, and auth to use.
535          writeConnectionHeaderPreamble(outStream);
536          try {
537            final InputStream in2 = inStream;
538            final OutputStream out2 = outStream;
539            continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
540              @Override
541              public Boolean run() throws IOException {
542                return setupSaslConnection(in2, out2, serverPrincipal);
543              }
544            });
545          } catch (Exception ex) {
546            ExceptionUtil.rethrowIfInterrupt(ex);
547            saslNegotiationDone(serverPrincipal, false);
548            handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, ticket,
549              serverPrincipal);
550            continue;
551          }
552          saslNegotiationDone(serverPrincipal, true);
553          if (continueSasl) {
554            // Sasl connect is successful. Let's set up Sasl i/o streams.
555            inStream = saslRpcClient.getInputStream();
556            outStream = saslRpcClient.getOutputStream();
557          } else {
558            // fall back to simple auth because server told us so.
559            // do not change authMethod and useSasl here, we should start from secure when
560            // reconnecting because regionserver may change its sasl config after restart.
561            saslRpcClient = null;
562          }
563        } else {
564          // Write out the preamble -- MAGIC, version, and auth to use.
565          writeConnectionHeaderPreamble(outStream);
566        }
567        createStreams(inStream, outStream);
568        // Now write out the connection header
569        writeConnectionHeader();
570        // process the response from server for connection header if necessary
571        processResponseForConnectionHeader();
572        break;
573      }
574    } catch (Throwable t) {
575      closeSocket();
576      IOException e = ExceptionUtil.asInterrupt(t);
577      if (e == null) {
578        this.rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), t);
579        if (t instanceof LinkageError) {
580          // probably the hbase hadoop version does not match the running hadoop version
581          e = new DoNotRetryIOException(t);
582        } else if (t instanceof IOException) {
583          e = (IOException) t;
584        } else {
585          e = new IOException("Could not set up IO Streams to " + remoteId.getAddress(), t);
586        }
587      }
588      throw e;
589    }
590
591    // start the receiver thread after the socket connection has been set up
592    thread = new Thread(this, threadName);
593    thread.setDaemon(true);
594    thread.start();
595  }
596
597  /**
598   * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
599   */
600  private void writeConnectionHeaderPreamble(OutputStream out) throws IOException {
601    out.write(connectionHeaderPreamble);
602    out.flush();
603  }
604
605  /**
606   * Write the connection header.
607   */
608  private void writeConnectionHeader() throws IOException {
609    boolean isCryptoAesEnable = false;
610    // check if Crypto AES is enabled
611    if (saslRpcClient != null) {
612      boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop()
613        .equalsIgnoreCase(saslRpcClient.getSaslQOP());
614      isCryptoAesEnable = saslEncryptionEnabled
615        && conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
616    }
617
618    // if Crypto AES is enabled, set transformation and negotiate with server
619    if (isCryptoAesEnable) {
620      waitingConnectionHeaderResponse = true;
621    }
622    this.out.write(connectionHeaderWithLength);
623    this.out.flush();
624  }
625
626  private void processResponseForConnectionHeader() throws IOException {
627    // if no response excepted, return
628    if (!waitingConnectionHeaderResponse) return;
629    try {
630      // read the ConnectionHeaderResponse from server
631      int len = this.in.readInt();
632      byte[] buff = new byte[len];
633      int readSize = this.in.read(buff);
634      if (LOG.isDebugEnabled()) {
635        LOG.debug("Length of response for connection header:" + readSize);
636      }
637
638      RPCProtos.ConnectionHeaderResponse connectionHeaderResponse =
639        RPCProtos.ConnectionHeaderResponse.parseFrom(buff);
640
641      // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher
642      if (connectionHeaderResponse.hasCryptoCipherMeta()) {
643        negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta());
644      }
645      waitingConnectionHeaderResponse = false;
646    } catch (SocketTimeoutException ste) {
647      LOG.error(HBaseMarkers.FATAL,
648        "Can't get the connection header response for rpc timeout, "
649          + "please check if server has the correct configuration to support the additional "
650          + "function.",
651        ste);
652      // timeout when waiting the connection header response, ignore the additional function
653      throw new IOException("Timeout while waiting connection header response", ste);
654    }
655  }
656
657  private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta) throws IOException {
658    // initialize the Crypto AES with CryptoCipherMeta
659    saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf);
660    // reset the inputStream/outputStream for Crypto AES encryption
661    this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream()));
662    this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
663  }
664
665  /**
666   * Initiates a call by sending the parameter to the remote server. Note: this is not called from
667   * the Connection thread, but by other threads.
668   * @see #readResponse()
669   */
670  private void writeRequest(Call call) throws IOException {
671    ByteBuf cellBlock = null;
672    try {
673      cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor,
674        call.cells, PooledByteBufAllocator.DEFAULT);
675      CellBlockMeta cellBlockMeta;
676      if (cellBlock != null) {
677        cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build();
678      } else {
679        cellBlockMeta = null;
680      }
681      RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
682      if (call.isConnectionRegistryCall()) {
683        setupIOstreams(call);
684        return;
685      }
686      setupIOstreams(null);
687
688      // Now we're going to write the call. We take the lock, then check that the connection
689      // is still valid, and, if so we do the write to the socket. If the write fails, we don't
690      // know where we stand, we have to close the connection.
691      if (Thread.interrupted()) {
692        throw new InterruptedIOException();
693      }
694
695      calls.put(call.id, call); // We put first as we don't want the connection to become idle.
696      // from here, we do not throw any exception to upper layer as the call has been tracked in
697      // the pending calls map.
698      try {
699        call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
700      } catch (Throwable t) {
701        if (LOG.isTraceEnabled()) {
702          LOG.trace("Error while writing {}", call.toShortString());
703        }
704        IOException e = IPCUtil.toIOE(t);
705        closeConn(e);
706        return;
707      }
708    } finally {
709      if (cellBlock != null) {
710        cellBlock.release();
711      }
712    }
713    notifyAll();
714  }
715
716  /*
717   * Receive a response. Because only one receiver, so no synchronization on in.
718   */
719  private void readResponse() {
720    try {
721      readResponse(in, calls, null, remoteExc -> {
722        synchronized (this) {
723          closeConn(remoteExc);
724        }
725      });
726    } catch (IOException e) {
727      if (e instanceof SocketTimeoutException) {
728        // Clean up open calls but don't treat this as a fatal condition,
729        // since we expect certain responses to not make it by the specified
730        // {@link ConnectionId#rpcTimeout}.
731        if (LOG.isTraceEnabled()) {
732          LOG.trace("ignored", e);
733        }
734      } else {
735        synchronized (this) {
736          closeConn(e);
737        }
738      }
739    }
740  }
741
742  @Override
743  protected synchronized void callTimeout(Call call) {
744    // call sender
745    calls.remove(call.id);
746  }
747
748  // just close socket input and output.
749  private void closeSocket() {
750    IOUtils.closeStream(out);
751    IOUtils.closeStream(in);
752    IOUtils.closeSocket(socket);
753    out = null;
754    in = null;
755    socket = null;
756  }
757
758  // close socket, reader, and clean up all pending calls.
759  private void closeConn(IOException e) {
760    if (thread == null) {
761      return;
762    }
763    thread.interrupt();
764    thread = null;
765    closeSocket();
766    if (callSender != null) {
767      callSender.cleanup(e);
768    }
769    for (Call call : calls.values()) {
770      call.setException(e);
771    }
772    calls.clear();
773  }
774
775  // release all resources, the connection will not be used any more.
776  @Override
777  public synchronized void shutdown() {
778    closed = true;
779    if (callSender != null) {
780      callSender.interrupt();
781    }
782    closeConn(new IOException("connection to " + remoteId.getAddress() + " closed"));
783  }
784
785  @Override
786  public void cleanupConnection() {
787    // do nothing
788  }
789
790  @Override
791  public synchronized void sendRequest(final Call call, HBaseRpcController pcrc)
792    throws IOException {
793    pcrc.notifyOnCancel(new RpcCallback<Object>() {
794
795      @Override
796      public void run(Object parameter) {
797        setCancelled(call);
798        synchronized (BlockingRpcConnection.this) {
799          if (callSender != null) {
800            callSender.remove(call);
801          } else {
802            calls.remove(call.id);
803          }
804        }
805      }
806    }, new CancellationCallback() {
807
808      @Override
809      public void run(boolean cancelled) throws IOException {
810        if (cancelled) {
811          setCancelled(call);
812          return;
813        }
814        scheduleTimeoutTask(call);
815        if (callSender != null) {
816          callSender.sendCall(call);
817        } else {
818          // this is in the same thread with the caller so do not need to attach the trace context
819          // again.
820          writeRequest(call);
821        }
822      }
823    });
824  }
825
826  @Override
827  public synchronized boolean isActive() {
828    return thread != null;
829  }
830}