001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader;
021import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited;
023import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
024import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
025import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
026
027import java.io.BufferedInputStream;
028import java.io.BufferedOutputStream;
029import java.io.DataInputStream;
030import java.io.DataOutputStream;
031import java.io.IOException;
032import java.io.InputStream;
033import java.io.InterruptedIOException;
034import java.io.OutputStream;
035import java.net.Socket;
036import java.net.SocketTimeoutException;
037import java.net.UnknownHostException;
038import java.security.PrivilegedExceptionAction;
039import java.util.ArrayDeque;
040import java.util.Locale;
041import java.util.Queue;
042import java.util.concurrent.ConcurrentHashMap;
043import java.util.concurrent.ConcurrentMap;
044import java.util.concurrent.ThreadLocalRandom;
045import javax.security.sasl.SaslException;
046
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.hbase.CellScanner;
049import org.apache.hadoop.hbase.DoNotRetryIOException;
050import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
051import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
052import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
053import org.apache.hadoop.hbase.log.HBaseMarkers;
054import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
055import org.apache.hadoop.hbase.security.SaslUtil;
056import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
057import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
058import org.apache.hadoop.hbase.trace.TraceUtil;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.ExceptionUtil;
061import org.apache.hadoop.io.IOUtils;
062import org.apache.hadoop.ipc.RemoteException;
063import org.apache.hadoop.net.NetUtils;
064import org.apache.hadoop.security.UserGroupInformation;
065import org.apache.hadoop.util.StringUtils;
066import org.apache.htrace.core.TraceScope;
067import org.apache.yetus.audience.InterfaceAudience;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070import org.apache.hbase.thirdparty.com.google.protobuf.Message;
071import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
072import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
073import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
074import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
075import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
082
083/**
084 * Thread that reads responses and notifies callers. Each connection owns a socket connected to a
085 * remote address. Calls are multiplexed through this socket: responses may be delivered out of
086 * order.
087 */
088@InterfaceAudience.Private
089class BlockingRpcConnection extends RpcConnection implements Runnable {
090
091  private static final Logger LOG = LoggerFactory.getLogger(BlockingRpcConnection.class);
092
093  private final BlockingRpcClient rpcClient;
094
095  private final String threadName;
096  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
097      justification = "We are always under lock actually")
098  private Thread thread;
099
100  // connected socket. protected for writing UT.
101  protected Socket socket = null;
102  private DataInputStream in;
103  private DataOutputStream out;
104
105  private HBaseSaslRpcClient saslRpcClient;
106
107  // currently active calls
108  private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>();
109
110  private final CallSender callSender;
111
112  private boolean closed = false;
113
114  private byte[] connectionHeaderPreamble;
115
116  private byte[] connectionHeaderWithLength;
117
118  private boolean waitingConnectionHeaderResponse = false;
119
120  /**
121   * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a
122   * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to
123   * use a different thread for writing. This way, on interruptions, we either cancel the writes or
124   * ignore the answer if the write is already done, but we don't stop the write in the middle. This
125   * adds a thread per region server in the client, so it's kept as an option.
126   * <p>
127   * The implementation is simple: the client threads adds their call to the queue, and then wait
128   * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On
129   * interruption, the client cancels its call. The CallSender checks that the call has not been
130   * canceled before writing it.
131   * </p>
132   * When the connection closes, all the calls not yet sent are dismissed. The client thread is
133   * notified with an appropriate exception, as if the call was already sent but the answer not yet
134   * received.
135   * </p>
136   */
137  private class CallSender extends Thread {
138
139    private final Queue<Call> callsToWrite;
140
141    private final int maxQueueSize;
142
143    public CallSender(String name, Configuration conf) {
144      int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
145      callsToWrite = new ArrayDeque<>(queueSize);
146      this.maxQueueSize = queueSize;
147      setDaemon(true);
148      setName(name + " - writer");
149    }
150
151    public void sendCall(final Call call) throws IOException {
152      if (callsToWrite.size() >= maxQueueSize) {
153        throw new IOException("Can't add " + call.toShortString()
154            + " to the write queue. callsToWrite.size()=" + callsToWrite.size());
155      }
156      callsToWrite.offer(call);
157      BlockingRpcConnection.this.notifyAll();
158    }
159
160    public void remove(Call call) {
161      callsToWrite.remove(call);
162      // By removing the call from the expected call list, we make the list smaller, but
163      // it means as well that we don't know how many calls we cancelled.
164      calls.remove(call.id);
165      call.setException(new CallCancelledException(call.toShortString() + ", waitTime="
166          + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
167          + call.timeout));
168    }
169
170    /**
171     * Reads the call from the queue, write them on the socket.
172     */
173    @Override
174    public void run() {
175      synchronized (BlockingRpcConnection.this) {
176        while (!closed) {
177          if (callsToWrite.isEmpty()) {
178            // We should use another monitor object here for better performance since the read
179            // thread also uses ConnectionImpl.this. But this makes the locking schema more
180            // complicated, can do it later as an optimization.
181            try {
182              BlockingRpcConnection.this.wait();
183            } catch (InterruptedException e) {
184            }
185            // check if we need to quit, so continue the main loop instead of fallback.
186            continue;
187          }
188          Call call = callsToWrite.poll();
189          if (call.isDone()) {
190            continue;
191          }
192          try {
193            tracedWriteRequest(call);
194          } catch (IOException e) {
195            // exception here means the call has not been added to the pendingCalls yet, so we need
196            // to fail it by our own.
197            LOG.debug("call write error for {}", call.toShortString());
198            call.setException(e);
199            closeConn(e);
200          }
201        }
202      }
203    }
204
205    /**
206     * Cleans the call not yet sent when we finish.
207     */
208    public void cleanup(IOException e) {
209      IOException ie = new ConnectionClosingException(
210          "Connection to " + remoteId.address + " is closing.");
211      for (Call call : callsToWrite) {
212        call.setException(ie);
213      }
214      callsToWrite.clear();
215    }
216  }
217
218  BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
219    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
220        rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
221    this.rpcClient = rpcClient;
222    if (remoteId.getAddress().isUnresolved()) {
223      throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
224    }
225
226    this.connectionHeaderPreamble = getConnectionHeaderPreamble();
227    ConnectionHeader header = getConnectionHeader();
228    ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize());
229    DataOutputStream dos = new DataOutputStream(baos);
230    dos.writeInt(header.getSerializedSize());
231    header.writeTo(dos);
232    assert baos.size() == 4 + header.getSerializedSize();
233    this.connectionHeaderWithLength = baos.getBuffer();
234
235    UserGroupInformation ticket = remoteId.ticket.getUGI();
236    this.threadName = "BRPC Connection (" + this.rpcClient.socketFactory.hashCode() + ") to "
237        + remoteId.getAddress().toString()
238        + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName()));
239
240    if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) {
241      callSender = new CallSender(threadName, this.rpcClient.conf);
242      callSender.start();
243    } else {
244      callSender = null;
245    }
246  }
247
248  // protected for write UT.
249  protected void setupConnection() throws IOException {
250    short ioFailures = 0;
251    short timeoutFailures = 0;
252    while (true) {
253      try {
254        this.socket = this.rpcClient.socketFactory.createSocket();
255        this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay());
256        this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive);
257        if (this.rpcClient.localAddr != null) {
258          this.socket.bind(this.rpcClient.localAddr);
259        }
260        NetUtils.connect(this.socket, remoteId.getAddress(), this.rpcClient.connectTO);
261        this.socket.setSoTimeout(this.rpcClient.readTO);
262        return;
263      } catch (SocketTimeoutException toe) {
264        /*
265         * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries.
266         */
267        if (LOG.isDebugEnabled()) {
268          LOG.debug("Received exception in connection setup.\n" +
269              StringUtils.stringifyException(toe));
270        }
271        handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe);
272      } catch (IOException ie) {
273        if (LOG.isDebugEnabled()) {
274          LOG.debug("Received exception in connection setup.\n" +
275              StringUtils.stringifyException(ie));
276        }
277        handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie);
278      }
279    }
280  }
281
282  /**
283   * Handle connection failures If the current number of retries is equal to the max number of
284   * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting
285   * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence
286   * the sleep is synchronized; the locks will be retained.
287   * @param curRetries current number of retries
288   * @param maxRetries max number of retries allowed
289   * @param ioe failure reason
290   * @throws IOException if max number of retries is reached
291   */
292  private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
293      throws IOException {
294    closeSocket();
295
296    // throw the exception if the maximum number of retries is reached
297    if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
298      throw ioe;
299    }
300
301    // otherwise back off and retry
302    try {
303      Thread.sleep(this.rpcClient.failureSleep);
304    } catch (InterruptedException ie) {
305      ExceptionUtil.rethrowIfInterrupt(ie);
306    }
307
308    if (LOG.isInfoEnabled()) {
309      LOG.info("Retrying connect to server: " + remoteId.getAddress() +
310        " after sleeping " + this.rpcClient.failureSleep + "ms. Already tried " + curRetries +
311        " time(s).");
312    }
313  }
314
315  /*
316   * wait till someone signals us to start reading RPC response or it is idle too long, it is marked
317   * as to be closed, or the client is marked as not running.
318   * @return true if it is time to read a response; false otherwise.
319   */
320  private synchronized boolean waitForWork() {
321    // beware of the concurrent access to the calls list: we can add calls, but as well
322    // remove them.
323    long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose;
324    for (;;) {
325      if (thread == null) {
326        return false;
327      }
328      if (!calls.isEmpty()) {
329        return true;
330      }
331      if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
332        closeConn(
333          new IOException("idle connection closed with " + calls.size() + " pending request(s)"));
334        return false;
335      }
336      try {
337        wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
338      } catch (InterruptedException e) {
339      }
340    }
341  }
342
343  @Override
344  public void run() {
345    if (LOG.isTraceEnabled()) {
346      LOG.trace(threadName + ": starting, connections " + this.rpcClient.connections.size());
347    }
348    while (waitForWork()) {
349      readResponse();
350    }
351    if (LOG.isTraceEnabled()) {
352      LOG.trace(threadName + ": stopped, connections " + this.rpcClient.connections.size());
353    }
354  }
355
356  private void disposeSasl() {
357    if (saslRpcClient != null) {
358      saslRpcClient.dispose();
359      saslRpcClient = null;
360    }
361  }
362
363  private boolean setupSaslConnection(final InputStream in2, final OutputStream out2)
364      throws IOException {
365    saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
366        serverAddress, securityInfo, this.rpcClient.fallbackAllowed,
367        this.rpcClient.conf.get("hbase.rpc.protection",
368            QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
369        this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
370    return saslRpcClient.saslConnect(in2, out2);
371  }
372
373  /**
374   * If multiple clients with the same principal try to connect to the same server at the same time,
375   * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
376   * work around this, what is done is that the client backs off randomly and tries to initiate the
377   * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
378   * attempted.
379   * <p>
380   * The retry logic is governed by the {@link SaslClientAuthenticationProvider#canRetry()}
381   * method. Some providers have the ability to obtain new credentials and then re-attempt to
382   * authenticate with HBase services. Other providers will continue to fail if they failed the
383   * first time -- for those, we want to fail-fast.
384   * </p>
385   */
386  private void handleSaslConnectionFailure(final int currRetries, final int maxRetries,
387      final Exception ex, final UserGroupInformation user)
388      throws IOException, InterruptedException {
389    closeSocket();
390    user.doAs(new PrivilegedExceptionAction<Object>() {
391      @Override
392      public Object run() throws IOException, InterruptedException {
393        // A provider which failed authentication, but doesn't have the ability to relogin with
394        // some external system (e.g. username/password, the password either works or it doesn't)
395        if (!provider.canRetry()) {
396          LOG.warn("Exception encountered while connecting to the server : " + ex);
397          if (ex instanceof RemoteException) {
398            throw (RemoteException) ex;
399          }
400          if (ex instanceof SaslException) {
401            String msg = "SASL authentication failed."
402                + " The most likely cause is missing or invalid credentials.";
403            throw new RuntimeException(msg, ex);
404          }
405          throw new IOException(ex);
406        }
407
408        // Other providers, like kerberos, could request a new ticket from a keytab. Let
409        // them try again.
410        if (currRetries < maxRetries) {
411          LOG.debug("Exception encountered while connecting to the server", ex);
412
413          // Invoke the provider to perform the relogin
414          provider.relogin();
415
416          // Get rid of any old state on the SaslClient
417          disposeSasl();
418
419          // have granularity of milliseconds
420          // we are sleeping with the Connection lock held but since this
421          // connection instance is being used for connecting to the server
422          // in question, it is okay
423          Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1);
424          return null;
425        } else {
426          String msg = "Failed to initiate connection for "
427              + UserGroupInformation.getLoginUser().getUserName() + " to "
428              + securityInfo.getServerPrincipal();
429          throw new IOException(msg, ex);
430        }
431      }
432    });
433  }
434
435  private void setupIOstreams() throws IOException {
436    if (socket != null) {
437      // The connection is already available. Perfect.
438      return;
439    }
440
441    if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) {
442      if (LOG.isDebugEnabled()) {
443        LOG.debug("Not trying to connect to " + remoteId.address
444            + " this server is in the failed servers list");
445      }
446      throw new FailedServerException(
447          "This server is in the failed servers list: " + remoteId.address);
448    }
449
450    try {
451      if (LOG.isDebugEnabled()) {
452        LOG.debug("Connecting to " + remoteId.address);
453      }
454
455      short numRetries = 0;
456      int reloginMaxRetries = this.rpcClient.conf.getInt("hbase.security.relogin.maxretries", 5);
457      while (true) {
458        setupConnection();
459        InputStream inStream = NetUtils.getInputStream(socket);
460        // This creates a socket with a write timeout. This timeout cannot be changed.
461        OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
462        // Write out the preamble -- MAGIC, version, and auth to use.
463        writeConnectionHeaderPreamble(outStream);
464        if (useSasl) {
465          final InputStream in2 = inStream;
466          final OutputStream out2 = outStream;
467          UserGroupInformation ticket = provider.getRealUser(remoteId.ticket);
468          boolean continueSasl;
469          if (ticket == null) {
470            throw new FatalConnectionException("ticket/user is null");
471          }
472          try {
473            continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
474              @Override
475              public Boolean run() throws IOException {
476                return setupSaslConnection(in2, out2);
477              }
478            });
479          } catch (Exception ex) {
480            ExceptionUtil.rethrowIfInterrupt(ex);
481            handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, ticket);
482            continue;
483          }
484          if (continueSasl) {
485            // Sasl connect is successful. Let's set up Sasl i/o streams.
486            inStream = saslRpcClient.getInputStream();
487            outStream = saslRpcClient.getOutputStream();
488          } else {
489            // fall back to simple auth because server told us so.
490            // do not change authMethod and useSasl here, we should start from secure when
491            // reconnecting because regionserver may change its sasl config after restart.
492          }
493        }
494        this.in = new DataInputStream(new BufferedInputStream(inStream));
495        this.out = new DataOutputStream(new BufferedOutputStream(outStream));
496        // Now write out the connection header
497        writeConnectionHeader();
498        // process the response from server for connection header if necessary
499        processResponseForConnectionHeader();
500
501        break;
502      }
503    } catch (Throwable t) {
504      closeSocket();
505      IOException e = ExceptionUtil.asInterrupt(t);
506      if (e == null) {
507        this.rpcClient.failedServers.addToFailedServers(remoteId.address, t);
508        if (t instanceof LinkageError) {
509          // probably the hbase hadoop version does not match the running hadoop version
510          e = new DoNotRetryIOException(t);
511        } else if (t instanceof IOException) {
512          e = (IOException) t;
513        } else {
514          e = new IOException("Could not set up IO Streams to " + remoteId.address, t);
515        }
516      }
517      throw e;
518    }
519
520    // start the receiver thread after the socket connection has been set up
521    thread = new Thread(this, threadName);
522    thread.setDaemon(true);
523    thread.start();
524  }
525
526  /**
527   * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
528   */
529  private void writeConnectionHeaderPreamble(OutputStream out) throws IOException {
530    out.write(connectionHeaderPreamble);
531    out.flush();
532  }
533
534  /**
535   * Write the connection header.
536   */
537  private void writeConnectionHeader() throws IOException {
538    boolean isCryptoAesEnable = false;
539    // check if Crypto AES is enabled
540    if (saslRpcClient != null) {
541      boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.
542          getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP());
543      isCryptoAesEnable = saslEncryptionEnabled && conf.getBoolean(
544          CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
545    }
546
547    // if Crypto AES is enabled, set transformation and negotiate with server
548    if (isCryptoAesEnable) {
549      waitingConnectionHeaderResponse = true;
550    }
551    this.out.write(connectionHeaderWithLength);
552    this.out.flush();
553  }
554
555  private void processResponseForConnectionHeader() throws IOException {
556    // if no response excepted, return
557    if (!waitingConnectionHeaderResponse) return;
558    try {
559      // read the ConnectionHeaderResponse from server
560      int len = this.in.readInt();
561      byte[] buff = new byte[len];
562      int readSize = this.in.read(buff);
563      if (LOG.isDebugEnabled()) {
564        LOG.debug("Length of response for connection header:" + readSize);
565      }
566
567      RPCProtos.ConnectionHeaderResponse connectionHeaderResponse =
568          RPCProtos.ConnectionHeaderResponse.parseFrom(buff);
569
570      // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher
571      if (connectionHeaderResponse.hasCryptoCipherMeta()) {
572        negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta());
573      }
574      waitingConnectionHeaderResponse = false;
575    } catch (SocketTimeoutException ste) {
576      LOG.error(HBaseMarkers.FATAL, "Can't get the connection header response for rpc timeout, "
577          + "please check if server has the correct configuration to support the additional "
578          + "function.", ste);
579      // timeout when waiting the connection header response, ignore the additional function
580      throw new IOException("Timeout while waiting connection header response", ste);
581    }
582  }
583
584  private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta)
585      throws IOException {
586    // initialize the Crypto AES with CryptoCipherMeta
587    saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf);
588    // reset the inputStream/outputStream for Crypto AES encryption
589    this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream()));
590    this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
591  }
592
593  private void tracedWriteRequest(Call call) throws IOException {
594    try (TraceScope ignored = TraceUtil.createTrace("RpcClientImpl.tracedWriteRequest",
595          call.span)) {
596      writeRequest(call);
597    }
598  }
599
600  /**
601   * Initiates a call by sending the parameter to the remote server. Note: this is not called from
602   * the Connection thread, but by other threads.
603   * @see #readResponse()
604   */
605  private void writeRequest(Call call) throws IOException {
606    ByteBuf cellBlock = null;
607    try {
608      cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor,
609          call.cells, PooledByteBufAllocator.DEFAULT);
610      CellBlockMeta cellBlockMeta;
611      if (cellBlock != null) {
612        cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build();
613      } else {
614        cellBlockMeta = null;
615      }
616      RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
617
618      setupIOstreams();
619
620      // Now we're going to write the call. We take the lock, then check that the connection
621      // is still valid, and, if so we do the write to the socket. If the write fails, we don't
622      // know where we stand, we have to close the connection.
623      if (Thread.interrupted()) {
624        throw new InterruptedIOException();
625      }
626
627      calls.put(call.id, call); // We put first as we don't want the connection to become idle.
628      // from here, we do not throw any exception to upper layer as the call has been tracked in
629      // the pending calls map.
630      try {
631        call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
632      } catch (Throwable t) {
633        if(LOG.isTraceEnabled()) {
634          LOG.trace("Error while writing {}", call.toShortString());
635        }
636        IOException e = IPCUtil.toIOE(t);
637        closeConn(e);
638        return;
639      }
640    } finally {
641      if (cellBlock != null) {
642        cellBlock.release();
643      }
644    }
645    notifyAll();
646  }
647
648  /*
649   * Receive a response. Because only one receiver, so no synchronization on in.
650   */
651  private void readResponse() {
652    Call call = null;
653    boolean expectedCall = false;
654    try {
655      // See HBaseServer.Call.setResponse for where we write out the response.
656      // Total size of the response. Unused. But have to read it in anyways.
657      int totalSize = in.readInt();
658
659      // Read the header
660      ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
661      int id = responseHeader.getCallId();
662      call = calls.remove(id); // call.done have to be set before leaving this method
663      expectedCall = (call != null && !call.isDone());
664      if (!expectedCall) {
665        // So we got a response for which we have no corresponding 'call' here on the client-side.
666        // We probably timed out waiting, cleaned up all references, and now the server decides
667        // to return a response. There is nothing we can do w/ the response at this stage. Clean
668        // out the wire of the response so its out of the way and we can get other responses on
669        // this connection.
670        int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader);
671        int whatIsLeftToRead = totalSize - readSoFar;
672        IOUtils.skipFully(in, whatIsLeftToRead);
673        if (call != null) {
674          call.callStats.setResponseSizeBytes(totalSize);
675          call.callStats
676              .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
677        }
678        return;
679      }
680      if (responseHeader.hasException()) {
681        ExceptionResponse exceptionResponse = responseHeader.getException();
682        RemoteException re = createRemoteException(exceptionResponse);
683        call.setException(re);
684        call.callStats.setResponseSizeBytes(totalSize);
685        call.callStats
686            .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
687        if (isFatalConnectionException(exceptionResponse)) {
688          synchronized (this) {
689            closeConn(re);
690          }
691        }
692      } else {
693        Message value = null;
694        if (call.responseDefaultType != null) {
695          Builder builder = call.responseDefaultType.newBuilderForType();
696          ProtobufUtil.mergeDelimitedFrom(builder, in);
697          value = builder.build();
698        }
699        CellScanner cellBlockScanner = null;
700        if (responseHeader.hasCellBlockMeta()) {
701          int size = responseHeader.getCellBlockMeta().getLength();
702          byte[] cellBlock = new byte[size];
703          IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
704          cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec,
705            this.compressor, cellBlock);
706        }
707        call.setResponse(value, cellBlockScanner);
708        call.callStats.setResponseSizeBytes(totalSize);
709        call.callStats
710            .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
711      }
712    } catch (IOException e) {
713      if (expectedCall) {
714        call.setException(e);
715      }
716      if (e instanceof SocketTimeoutException) {
717        // Clean up open calls but don't treat this as a fatal condition,
718        // since we expect certain responses to not make it by the specified
719        // {@link ConnectionId#rpcTimeout}.
720        if (LOG.isTraceEnabled()) {
721          LOG.trace("ignored", e);
722        }
723      } else {
724        synchronized (this) {
725          closeConn(e);
726        }
727      }
728    }
729  }
730
731  @Override
732  protected synchronized void callTimeout(Call call) {
733    // call sender
734    calls.remove(call.id);
735  }
736
737  // just close socket input and output.
738  private void closeSocket() {
739    IOUtils.closeStream(out);
740    IOUtils.closeStream(in);
741    IOUtils.closeSocket(socket);
742    out = null;
743    in = null;
744    socket = null;
745  }
746
747  // close socket, reader, and clean up all pending calls.
748  private void closeConn(IOException e) {
749    if (thread == null) {
750      return;
751    }
752    thread.interrupt();
753    thread = null;
754    closeSocket();
755    if (callSender != null) {
756      callSender.cleanup(e);
757    }
758    for (Call call : calls.values()) {
759      call.setException(e);
760    }
761    calls.clear();
762  }
763
764  // release all resources, the connection will not be used any more.
765  @Override
766  public synchronized void shutdown() {
767    closed = true;
768    if (callSender != null) {
769      callSender.interrupt();
770    }
771    closeConn(new IOException("connection to " + remoteId.address + " closed"));
772  }
773
774  @Override
775  public void cleanupConnection() {
776    // do nothing
777  }
778
779  @Override
780  public synchronized void sendRequest(final Call call, HBaseRpcController pcrc)
781      throws IOException {
782    pcrc.notifyOnCancel(new RpcCallback<Object>() {
783
784      @Override
785      public void run(Object parameter) {
786        setCancelled(call);
787        synchronized (BlockingRpcConnection.this) {
788          if (callSender != null) {
789            callSender.remove(call);
790          } else {
791            calls.remove(call.id);
792          }
793        }
794      }
795    }, new CancellationCallback() {
796
797      @Override
798      public void run(boolean cancelled) throws IOException {
799        if (cancelled) {
800          setCancelled(call);
801          return;
802        }
803        scheduleTimeoutTask(call);
804        if (callSender != null) {
805          callSender.sendCall(call);
806        } else {
807          tracedWriteRequest(call);
808        }
809      }
810    });
811  }
812
813  @Override
814  public synchronized boolean isActive() {
815    return thread != null;
816  }
817}