001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader;
021import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited;
023import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
024import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
025import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
026
027import io.opentelemetry.context.Scope;
028import java.io.BufferedInputStream;
029import java.io.BufferedOutputStream;
030import java.io.DataInputStream;
031import java.io.DataOutputStream;
032import java.io.IOException;
033import java.io.InputStream;
034import java.io.InterruptedIOException;
035import java.io.OutputStream;
036import java.net.InetSocketAddress;
037import java.net.Socket;
038import java.net.SocketTimeoutException;
039import java.security.PrivilegedExceptionAction;
040import java.util.ArrayDeque;
041import java.util.Locale;
042import java.util.Queue;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ConcurrentMap;
045import java.util.concurrent.ThreadLocalRandom;
046import javax.security.sasl.SaslException;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.hbase.CellScanner;
049import org.apache.hadoop.hbase.DoNotRetryIOException;
050import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
051import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
052import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
053import org.apache.hadoop.hbase.log.HBaseMarkers;
054import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
055import org.apache.hadoop.hbase.security.SaslUtil;
056import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
057import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.hbase.util.ExceptionUtil;
060import org.apache.hadoop.io.IOUtils;
061import org.apache.hadoop.ipc.RemoteException;
062import org.apache.hadoop.net.NetUtils;
063import org.apache.hadoop.security.UserGroupInformation;
064import org.apache.hadoop.util.StringUtils;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.protobuf.Message;
070import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
071import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
072import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
073
074import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
081
082/**
083 * Thread that reads responses and notifies callers. Each connection owns a socket connected to a
084 * remote address. Calls are multiplexed through this socket: responses may be delivered out of
085 * order.
086 */
087@InterfaceAudience.Private
088class BlockingRpcConnection extends RpcConnection implements Runnable {
089
090  private static final Logger LOG = LoggerFactory.getLogger(BlockingRpcConnection.class);
091
092  private final BlockingRpcClient rpcClient;
093
094  private final String threadName;
095  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
096      justification = "We are always under lock actually")
097  private Thread thread;
098
099  // connected socket. protected for writing UT.
100  protected Socket socket = null;
101  private DataInputStream in;
102  private DataOutputStream out;
103
104  private HBaseSaslRpcClient saslRpcClient;
105
106  // currently active calls
107  private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>();
108
109  private final CallSender callSender;
110
111  private boolean closed = false;
112
113  private byte[] connectionHeaderPreamble;
114
115  private byte[] connectionHeaderWithLength;
116
117  private boolean waitingConnectionHeaderResponse = false;
118
119  /**
120   * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a
121   * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to
122   * use a different thread for writing. This way, on interruptions, we either cancel the writes or
123   * ignore the answer if the write is already done, but we don't stop the write in the middle. This
124   * adds a thread per region server in the client, so it's kept as an option.
125   * <p>
126   * The implementation is simple: the client threads adds their call to the queue, and then wait
127   * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On
128   * interruption, the client cancels its call. The CallSender checks that the call has not been
129   * canceled before writing it.
130   * </p>
131   * When the connection closes, all the calls not yet sent are dismissed. The client thread is
132   * notified with an appropriate exception, as if the call was already sent but the answer not yet
133   * received.
134   * </p>
135   */
136  private class CallSender extends Thread {
137
138    private final Queue<Call> callsToWrite;
139
140    private final int maxQueueSize;
141
142    public CallSender(String name, Configuration conf) {
143      int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
144      callsToWrite = new ArrayDeque<>(queueSize);
145      this.maxQueueSize = queueSize;
146      setDaemon(true);
147      setName(name + " - writer");
148    }
149
150    public void sendCall(final Call call) throws IOException {
151      if (callsToWrite.size() >= maxQueueSize) {
152        throw new IOException("Can't add " + call.toShortString()
153          + " to the write queue. callsToWrite.size()=" + callsToWrite.size());
154      }
155      callsToWrite.offer(call);
156      BlockingRpcConnection.this.notifyAll();
157    }
158
159    public void remove(Call call) {
160      callsToWrite.remove(call);
161      // By removing the call from the expected call list, we make the list smaller, but
162      // it means as well that we don't know how many calls we cancelled.
163      calls.remove(call.id);
164      call.setException(new CallCancelledException(call.toShortString() + ", waitTime="
165        + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
166        + call.timeout));
167    }
168
169    /**
170     * Reads the call from the queue, write them on the socket.
171     */
172    @Override
173    public void run() {
174      synchronized (BlockingRpcConnection.this) {
175        while (!closed) {
176          if (callsToWrite.isEmpty()) {
177            // We should use another monitor object here for better performance since the read
178            // thread also uses ConnectionImpl.this. But this makes the locking schema more
179            // complicated, can do it later as an optimization.
180            try {
181              BlockingRpcConnection.this.wait();
182            } catch (InterruptedException e) {
183              // Restore interrupt status
184              Thread.currentThread().interrupt();
185            }
186            // check if we need to quit, so continue the main loop instead of fallback.
187            continue;
188          }
189          Call call = callsToWrite.poll();
190          if (call.isDone()) {
191            continue;
192          }
193          try (Scope scope = call.span.makeCurrent()) {
194            writeRequest(call);
195          } catch (IOException e) {
196            // exception here means the call has not been added to the pendingCalls yet, so we need
197            // to fail it by our own.
198            LOG.debug("call write error for {}", call.toShortString());
199            call.setException(e);
200            closeConn(e);
201          }
202        }
203      }
204    }
205
206    /**
207     * Cleans the call not yet sent when we finish.
208     */
209    public void cleanup(IOException e) {
210      IOException ie =
211        new ConnectionClosingException("Connection to " + remoteId.getAddress() + " is closing.");
212      for (Call call : callsToWrite) {
213        call.setException(ie);
214      }
215      callsToWrite.clear();
216    }
217  }
218
219  BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
220    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
221      rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor,
222      rpcClient.metrics, rpcClient.connectionAttributes);
223    this.rpcClient = rpcClient;
224    this.connectionHeaderPreamble = getConnectionHeaderPreamble();
225    ConnectionHeader header = getConnectionHeader();
226    ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize());
227    DataOutputStream dos = new DataOutputStream(baos);
228    dos.writeInt(header.getSerializedSize());
229    header.writeTo(dos);
230    assert baos.size() == 4 + header.getSerializedSize();
231    this.connectionHeaderWithLength = baos.getBuffer();
232
233    UserGroupInformation ticket = remoteId.ticket.getUGI();
234    this.threadName = "BRPC Connection (" + this.rpcClient.socketFactory.hashCode() + ") to "
235      + remoteId.getAddress().toString()
236      + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName()));
237
238    if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) {
239      callSender = new CallSender(threadName, this.rpcClient.conf);
240      callSender.start();
241    } else {
242      callSender = null;
243    }
244  }
245
246  // protected for write UT.
247  protected void setupConnection() throws IOException {
248    short ioFailures = 0;
249    short timeoutFailures = 0;
250    while (true) {
251      try {
252        this.socket = this.rpcClient.socketFactory.createSocket();
253        this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay());
254        this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive);
255        if (this.rpcClient.localAddr != null) {
256          this.socket.bind(this.rpcClient.localAddr);
257        }
258        InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
259        NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO);
260        this.socket.setSoTimeout(this.rpcClient.readTO);
261        return;
262      } catch (SocketTimeoutException toe) {
263        /*
264         * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries.
265         */
266        if (LOG.isDebugEnabled()) {
267          LOG.debug(
268            "Received exception in connection setup.\n" + StringUtils.stringifyException(toe));
269        }
270        handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe);
271      } catch (IOException ie) {
272        if (LOG.isDebugEnabled()) {
273          LOG.debug(
274            "Received exception in connection setup.\n" + StringUtils.stringifyException(ie));
275        }
276        handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie);
277      }
278    }
279  }
280
281  /**
282   * Handle connection failures If the current number of retries is equal to the max number of
283   * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting
284   * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence
285   * the sleep is synchronized; the locks will be retained.
286   * @param curRetries current number of retries
287   * @param maxRetries max number of retries allowed
288   * @param ioe        failure reason
289   * @throws IOException if max number of retries is reached
290   */
291  private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
292    throws IOException {
293    closeSocket();
294
295    // throw the exception if the maximum number of retries is reached
296    if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
297      throw ioe;
298    }
299
300    // otherwise back off and retry
301    try {
302      Thread.sleep(this.rpcClient.failureSleep);
303    } catch (InterruptedException ie) {
304      ExceptionUtil.rethrowIfInterrupt(ie);
305    }
306
307    if (LOG.isInfoEnabled()) {
308      LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping "
309        + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s).");
310    }
311  }
312
313  /*
314   * wait till someone signals us to start reading RPC response or it is idle too long, it is marked
315   * as to be closed, or the client is marked as not running.
316   * @return true if it is time to read a response; false otherwise.
317   */
318  private synchronized boolean waitForWork() {
319    // beware of the concurrent access to the calls list: we can add calls, but as well
320    // remove them.
321    long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose;
322    for (;;) {
323      if (thread == null) {
324        return false;
325      }
326      if (!calls.isEmpty()) {
327        return true;
328      }
329      if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
330        closeConn(
331          new IOException("idle connection closed with " + calls.size() + " pending request(s)"));
332        return false;
333      }
334      try {
335        wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
336      } catch (InterruptedException e) {
337        // Restore interrupt status
338        Thread.currentThread().interrupt();
339      }
340    }
341  }
342
343  @Override
344  public void run() {
345    if (LOG.isTraceEnabled()) {
346      LOG.trace(threadName + ": starting");
347    }
348    while (waitForWork()) {
349      readResponse();
350    }
351    if (LOG.isTraceEnabled()) {
352      LOG.trace(threadName + ": stopped");
353    }
354  }
355
356  private void disposeSasl() {
357    if (saslRpcClient != null) {
358      saslRpcClient.dispose();
359      saslRpcClient = null;
360    }
361  }
362
363  private boolean setupSaslConnection(final InputStream in2, final OutputStream out2)
364    throws IOException {
365    if (this.metrics != null) {
366      this.metrics.incrNsLookups();
367    }
368    saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
369      socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed,
370      this.rpcClient.conf.get("hbase.rpc.protection",
371        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
372      this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
373    return saslRpcClient.saslConnect(in2, out2);
374  }
375
376  /**
377   * If multiple clients with the same principal try to connect to the same server at the same time,
378   * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
379   * work around this, what is done is that the client backs off randomly and tries to initiate the
380   * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
381   * attempted.
382   * <p>
383   * The retry logic is governed by the {@link SaslClientAuthenticationProvider#canRetry()} method.
384   * Some providers have the ability to obtain new credentials and then re-attempt to authenticate
385   * with HBase services. Other providers will continue to fail if they failed the first time -- for
386   * those, we want to fail-fast.
387   * </p>
388   */
389  private void handleSaslConnectionFailure(final int currRetries, final int maxRetries,
390    final Exception ex, final UserGroupInformation user) throws IOException, InterruptedException {
391    closeSocket();
392    user.doAs(new PrivilegedExceptionAction<Object>() {
393      @Override
394      public Object run() throws IOException, InterruptedException {
395        // A provider which failed authentication, but doesn't have the ability to relogin with
396        // some external system (e.g. username/password, the password either works or it doesn't)
397        if (!provider.canRetry()) {
398          LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(),
399            ex);
400          if (ex instanceof RemoteException) {
401            throw (RemoteException) ex;
402          }
403          if (ex instanceof SaslException) {
404            String msg = "SASL authentication failed."
405              + " The most likely cause is missing or invalid credentials.";
406            throw new RuntimeException(msg, ex);
407          }
408          throw new IOException(ex);
409        }
410
411        // Other providers, like kerberos, could request a new ticket from a keytab. Let
412        // them try again.
413        if (currRetries < maxRetries) {
414          LOG.debug("Exception encountered while connecting to the server " + remoteId.getAddress(),
415            ex);
416
417          // Invoke the provider to perform the relogin
418          provider.relogin();
419
420          // Get rid of any old state on the SaslClient
421          disposeSasl();
422
423          // have granularity of milliseconds
424          // we are sleeping with the Connection lock held but since this
425          // connection instance is being used for connecting to the server
426          // in question, it is okay
427          Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1);
428          return null;
429        } else {
430          String msg =
431            "Failed to initiate connection for " + UserGroupInformation.getLoginUser().getUserName()
432              + " to " + securityInfo.getServerPrincipal();
433          throw new IOException(msg, ex);
434        }
435      }
436    });
437  }
438
439  private void setupIOstreams() throws IOException {
440    if (socket != null) {
441      // The connection is already available. Perfect.
442      return;
443    }
444
445    if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) {
446      if (LOG.isDebugEnabled()) {
447        LOG.debug("Not trying to connect to " + remoteId.getAddress()
448          + " this server is in the failed servers list");
449      }
450      throw new FailedServerException(
451        "This server is in the failed servers list: " + remoteId.getAddress());
452    }
453
454    try {
455      if (LOG.isDebugEnabled()) {
456        LOG.debug("Connecting to " + remoteId.getAddress());
457      }
458
459      short numRetries = 0;
460      int reloginMaxRetries = this.rpcClient.conf.getInt("hbase.security.relogin.maxretries", 5);
461      while (true) {
462        setupConnection();
463        InputStream inStream = NetUtils.getInputStream(socket);
464        // This creates a socket with a write timeout. This timeout cannot be changed.
465        OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
466        // Write out the preamble -- MAGIC, version, and auth to use.
467        writeConnectionHeaderPreamble(outStream);
468        if (useSasl) {
469          final InputStream in2 = inStream;
470          final OutputStream out2 = outStream;
471          UserGroupInformation ticket = provider.getRealUser(remoteId.ticket);
472          boolean continueSasl;
473          if (ticket == null) {
474            throw new FatalConnectionException("ticket/user is null");
475          }
476          try {
477            continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
478              @Override
479              public Boolean run() throws IOException {
480                return setupSaslConnection(in2, out2);
481              }
482            });
483          } catch (Exception ex) {
484            ExceptionUtil.rethrowIfInterrupt(ex);
485            handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, ticket);
486            continue;
487          }
488          if (continueSasl) {
489            // Sasl connect is successful. Let's set up Sasl i/o streams.
490            inStream = saslRpcClient.getInputStream();
491            outStream = saslRpcClient.getOutputStream();
492          } else {
493            // fall back to simple auth because server told us so.
494            // do not change authMethod and useSasl here, we should start from secure when
495            // reconnecting because regionserver may change its sasl config after restart.
496          }
497        }
498        this.in = new DataInputStream(new BufferedInputStream(inStream));
499        this.out = new DataOutputStream(new BufferedOutputStream(outStream));
500        // Now write out the connection header
501        writeConnectionHeader();
502        // process the response from server for connection header if necessary
503        processResponseForConnectionHeader();
504
505        break;
506      }
507    } catch (Throwable t) {
508      closeSocket();
509      IOException e = ExceptionUtil.asInterrupt(t);
510      if (e == null) {
511        this.rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), t);
512        if (t instanceof LinkageError) {
513          // probably the hbase hadoop version does not match the running hadoop version
514          e = new DoNotRetryIOException(t);
515        } else if (t instanceof IOException) {
516          e = (IOException) t;
517        } else {
518          e = new IOException("Could not set up IO Streams to " + remoteId.getAddress(), t);
519        }
520      }
521      throw e;
522    }
523
524    // start the receiver thread after the socket connection has been set up
525    thread = new Thread(this, threadName);
526    thread.setDaemon(true);
527    thread.start();
528  }
529
530  /**
531   * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
532   */
533  private void writeConnectionHeaderPreamble(OutputStream out) throws IOException {
534    out.write(connectionHeaderPreamble);
535    out.flush();
536  }
537
538  /**
539   * Write the connection header.
540   */
541  private void writeConnectionHeader() throws IOException {
542    boolean isCryptoAesEnable = false;
543    // check if Crypto AES is enabled
544    if (saslRpcClient != null) {
545      boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop()
546        .equalsIgnoreCase(saslRpcClient.getSaslQOP());
547      isCryptoAesEnable = saslEncryptionEnabled
548        && conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
549    }
550
551    // if Crypto AES is enabled, set transformation and negotiate with server
552    if (isCryptoAesEnable) {
553      waitingConnectionHeaderResponse = true;
554    }
555    this.out.write(connectionHeaderWithLength);
556    this.out.flush();
557  }
558
559  private void processResponseForConnectionHeader() throws IOException {
560    // if no response excepted, return
561    if (!waitingConnectionHeaderResponse) return;
562    try {
563      // read the ConnectionHeaderResponse from server
564      int len = this.in.readInt();
565      byte[] buff = new byte[len];
566      int readSize = this.in.read(buff);
567      if (LOG.isDebugEnabled()) {
568        LOG.debug("Length of response for connection header:" + readSize);
569      }
570
571      RPCProtos.ConnectionHeaderResponse connectionHeaderResponse =
572        RPCProtos.ConnectionHeaderResponse.parseFrom(buff);
573
574      // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher
575      if (connectionHeaderResponse.hasCryptoCipherMeta()) {
576        negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta());
577      }
578      waitingConnectionHeaderResponse = false;
579    } catch (SocketTimeoutException ste) {
580      LOG.error(HBaseMarkers.FATAL,
581        "Can't get the connection header response for rpc timeout, "
582          + "please check if server has the correct configuration to support the additional "
583          + "function.",
584        ste);
585      // timeout when waiting the connection header response, ignore the additional function
586      throw new IOException("Timeout while waiting connection header response", ste);
587    }
588  }
589
590  private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta) throws IOException {
591    // initialize the Crypto AES with CryptoCipherMeta
592    saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf);
593    // reset the inputStream/outputStream for Crypto AES encryption
594    this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream()));
595    this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
596  }
597
598  /**
599   * Initiates a call by sending the parameter to the remote server. Note: this is not called from
600   * the Connection thread, but by other threads.
601   * @see #readResponse()
602   */
603  private void writeRequest(Call call) throws IOException {
604    ByteBuf cellBlock = null;
605    try {
606      cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor,
607        call.cells, PooledByteBufAllocator.DEFAULT);
608      CellBlockMeta cellBlockMeta;
609      if (cellBlock != null) {
610        cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build();
611      } else {
612        cellBlockMeta = null;
613      }
614      RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
615
616      setupIOstreams();
617
618      // Now we're going to write the call. We take the lock, then check that the connection
619      // is still valid, and, if so we do the write to the socket. If the write fails, we don't
620      // know where we stand, we have to close the connection.
621      if (Thread.interrupted()) {
622        throw new InterruptedIOException();
623      }
624
625      calls.put(call.id, call); // We put first as we don't want the connection to become idle.
626      // from here, we do not throw any exception to upper layer as the call has been tracked in
627      // the pending calls map.
628      try {
629        call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
630      } catch (Throwable t) {
631        if (LOG.isTraceEnabled()) {
632          LOG.trace("Error while writing {}", call.toShortString());
633        }
634        IOException e = IPCUtil.toIOE(t);
635        closeConn(e);
636        return;
637      }
638    } finally {
639      if (cellBlock != null) {
640        cellBlock.release();
641      }
642    }
643    notifyAll();
644  }
645
646  /*
647   * Receive a response. Because only one receiver, so no synchronization on in.
648   */
649  private void readResponse() {
650    Call call = null;
651    boolean expectedCall = false;
652    try {
653      // See HBaseServer.Call.setResponse for where we write out the response.
654      // Total size of the response. Unused. But have to read it in anyways.
655      int totalSize = in.readInt();
656
657      // Read the header
658      ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
659      int id = responseHeader.getCallId();
660      call = calls.remove(id); // call.done have to be set before leaving this method
661      expectedCall = (call != null && !call.isDone());
662      if (!expectedCall) {
663        // So we got a response for which we have no corresponding 'call' here on the client-side.
664        // We probably timed out waiting, cleaned up all references, and now the server decides
665        // to return a response. There is nothing we can do w/ the response at this stage. Clean
666        // out the wire of the response so its out of the way and we can get other responses on
667        // this connection.
668        int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader);
669        int whatIsLeftToRead = totalSize - readSoFar;
670        IOUtils.skipFully(in, whatIsLeftToRead);
671        if (call != null) {
672          call.callStats.setResponseSizeBytes(totalSize);
673          call.callStats
674            .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
675        }
676        return;
677      }
678      if (responseHeader.hasException()) {
679        ExceptionResponse exceptionResponse = responseHeader.getException();
680        RemoteException re = createRemoteException(exceptionResponse);
681        call.setException(re);
682        call.callStats.setResponseSizeBytes(totalSize);
683        call.callStats
684          .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
685        if (isFatalConnectionException(exceptionResponse)) {
686          synchronized (this) {
687            closeConn(re);
688          }
689        }
690      } else {
691        Message value = null;
692        if (call.responseDefaultType != null) {
693          Message.Builder builder = call.responseDefaultType.newBuilderForType();
694          ProtobufUtil.mergeDelimitedFrom(builder, in);
695          value = builder.build();
696        }
697        CellScanner cellBlockScanner = null;
698        if (responseHeader.hasCellBlockMeta()) {
699          int size = responseHeader.getCellBlockMeta().getLength();
700          byte[] cellBlock = new byte[size];
701          IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
702          cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec,
703            this.compressor, cellBlock);
704        }
705        call.setResponse(value, cellBlockScanner);
706        call.callStats.setResponseSizeBytes(totalSize);
707        call.callStats
708          .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
709      }
710    } catch (IOException e) {
711      if (expectedCall) {
712        call.setException(e);
713      }
714      if (e instanceof SocketTimeoutException) {
715        // Clean up open calls but don't treat this as a fatal condition,
716        // since we expect certain responses to not make it by the specified
717        // {@link ConnectionId#rpcTimeout}.
718        if (LOG.isTraceEnabled()) {
719          LOG.trace("ignored", e);
720        }
721      } else {
722        synchronized (this) {
723          closeConn(e);
724        }
725      }
726    }
727  }
728
729  @Override
730  protected synchronized void callTimeout(Call call) {
731    // call sender
732    calls.remove(call.id);
733  }
734
735  // just close socket input and output.
736  private void closeSocket() {
737    IOUtils.closeStream(out);
738    IOUtils.closeStream(in);
739    IOUtils.closeSocket(socket);
740    out = null;
741    in = null;
742    socket = null;
743  }
744
745  // close socket, reader, and clean up all pending calls.
746  private void closeConn(IOException e) {
747    if (thread == null) {
748      return;
749    }
750    thread.interrupt();
751    thread = null;
752    closeSocket();
753    if (callSender != null) {
754      callSender.cleanup(e);
755    }
756    for (Call call : calls.values()) {
757      call.setException(e);
758    }
759    calls.clear();
760  }
761
762  // release all resources, the connection will not be used any more.
763  @Override
764  public synchronized void shutdown() {
765    closed = true;
766    if (callSender != null) {
767      callSender.interrupt();
768    }
769    closeConn(new IOException("connection to " + remoteId.getAddress() + " closed"));
770  }
771
772  @Override
773  public void cleanupConnection() {
774    // do nothing
775  }
776
777  @Override
778  public synchronized void sendRequest(final Call call, HBaseRpcController pcrc)
779    throws IOException {
780    pcrc.notifyOnCancel(new RpcCallback<Object>() {
781
782      @Override
783      public void run(Object parameter) {
784        setCancelled(call);
785        synchronized (BlockingRpcConnection.this) {
786          if (callSender != null) {
787            callSender.remove(call);
788          } else {
789            calls.remove(call.id);
790          }
791        }
792      }
793    }, new CancellationCallback() {
794
795      @Override
796      public void run(boolean cancelled) throws IOException {
797        if (cancelled) {
798          setCancelled(call);
799          return;
800        }
801        scheduleTimeoutTask(call);
802        if (callSender != null) {
803          callSender.sendCall(call);
804        } else {
805          // this is in the same thread with the caller so do not need to attach the trace context
806          // again.
807          writeRequest(call);
808        }
809      }
810    });
811  }
812
813  @Override
814  public synchronized boolean isActive() {
815    return thread != null;
816  }
817}