001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader;
021import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited;
023import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
024import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
025import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
026
027import java.io.BufferedInputStream;
028import java.io.BufferedOutputStream;
029import java.io.DataInputStream;
030import java.io.DataOutputStream;
031import java.io.IOException;
032import java.io.InputStream;
033import java.io.InterruptedIOException;
034import java.io.OutputStream;
035import java.net.Socket;
036import java.net.SocketTimeoutException;
037import java.net.UnknownHostException;
038import java.nio.ByteBuffer;
039import java.security.PrivilegedExceptionAction;
040import java.util.ArrayDeque;
041import java.util.Locale;
042import java.util.Queue;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ConcurrentMap;
045import java.util.concurrent.ThreadLocalRandom;
046import javax.security.sasl.SaslException;
047
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.CellScanner;
050import org.apache.hadoop.hbase.DoNotRetryIOException;
051import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
052import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
053import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
054import org.apache.hadoop.hbase.log.HBaseMarkers;
055import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
056import org.apache.hadoop.hbase.security.SaslUtil;
057import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
058import org.apache.hadoop.hbase.trace.TraceUtil;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.ExceptionUtil;
061import org.apache.hadoop.io.IOUtils;
062import org.apache.hadoop.ipc.RemoteException;
063import org.apache.hadoop.net.NetUtils;
064import org.apache.hadoop.security.UserGroupInformation;
065import org.apache.hadoop.util.StringUtils;
066import org.apache.htrace.core.TraceScope;
067import org.apache.yetus.audience.InterfaceAudience;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070import org.apache.hbase.thirdparty.com.google.protobuf.Message;
071import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
072import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
073import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
080
081/**
082 * Thread that reads responses and notifies callers. Each connection owns a socket connected to a
083 * remote address. Calls are multiplexed through this socket: responses may be delivered out of
084 * order.
085 */
086@InterfaceAudience.Private
087class BlockingRpcConnection extends RpcConnection implements Runnable {
088
089  private static final Logger LOG = LoggerFactory.getLogger(BlockingRpcConnection.class);
090
091  private final BlockingRpcClient rpcClient;
092
093  private final String threadName;
094  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
095      justification = "We are always under lock actually")
096  private Thread thread;
097
098  // connected socket. protected for writing UT.
099  protected Socket socket = null;
100  private DataInputStream in;
101  private DataOutputStream out;
102
103  private HBaseSaslRpcClient saslRpcClient;
104
105  // currently active calls
106  private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>();
107
108  private final CallSender callSender;
109
110  private boolean closed = false;
111
112  private byte[] connectionHeaderPreamble;
113
114  private byte[] connectionHeaderWithLength;
115
116  private boolean waitingConnectionHeaderResponse = false;
117
118  /**
119   * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a
120   * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to
121   * use a different thread for writing. This way, on interruptions, we either cancel the writes or
122   * ignore the answer if the write is already done, but we don't stop the write in the middle. This
123   * adds a thread per region server in the client, so it's kept as an option.
124   * <p>
125   * The implementation is simple: the client threads adds their call to the queue, and then wait
126   * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On
127   * interruption, the client cancels its call. The CallSender checks that the call has not been
128   * canceled before writing it.
129   * </p>
130   * When the connection closes, all the calls not yet sent are dismissed. The client thread is
131   * notified with an appropriate exception, as if the call was already sent but the answer not yet
132   * received.
133   * </p>
134   */
135  private class CallSender extends Thread {
136
137    private final Queue<Call> callsToWrite;
138
139    private final int maxQueueSize;
140
141    public CallSender(String name, Configuration conf) {
142      int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
143      callsToWrite = new ArrayDeque<>(queueSize);
144      this.maxQueueSize = queueSize;
145      setDaemon(true);
146      setName(name + " - writer");
147    }
148
149    public void sendCall(final Call call) throws IOException {
150      if (callsToWrite.size() >= maxQueueSize) {
151        throw new IOException("Can't add the call " + call.id
152            + " to the write queue. callsToWrite.size()=" + callsToWrite.size());
153      }
154      callsToWrite.offer(call);
155      BlockingRpcConnection.this.notifyAll();
156    }
157
158    public void remove(Call call) {
159      callsToWrite.remove(call);
160      // By removing the call from the expected call list, we make the list smaller, but
161      // it means as well that we don't know how many calls we cancelled.
162      calls.remove(call.id);
163      call.setException(new CallCancelledException("Call id=" + call.id + ", waitTime="
164          + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
165          + call.timeout));
166    }
167
168    /**
169     * Reads the call from the queue, write them on the socket.
170     */
171    @Override
172    public void run() {
173      synchronized (BlockingRpcConnection.this) {
174        while (!closed) {
175          if (callsToWrite.isEmpty()) {
176            // We should use another monitor object here for better performance since the read
177            // thread also uses ConnectionImpl.this. But this makes the locking schema more
178            // complicated, can do it later as an optimization.
179            try {
180              BlockingRpcConnection.this.wait();
181            } catch (InterruptedException e) {
182            }
183            // check if we need to quit, so continue the main loop instead of fallback.
184            continue;
185          }
186          Call call = callsToWrite.poll();
187          if (call.isDone()) {
188            continue;
189          }
190          try {
191            tracedWriteRequest(call);
192          } catch (IOException e) {
193            // exception here means the call has not been added to the pendingCalls yet, so we need
194            // to fail it by our own.
195            if (LOG.isDebugEnabled()) {
196              LOG.debug("call write error for call #" + call.id, e);
197            }
198            call.setException(e);
199            closeConn(e);
200          }
201        }
202      }
203    }
204
205    /**
206     * Cleans the call not yet sent when we finish.
207     */
208    public void cleanup(IOException e) {
209      IOException ie = new ConnectionClosingException(
210          "Connection to " + remoteId.address + " is closing.");
211      for (Call call : callsToWrite) {
212        call.setException(ie);
213      }
214      callsToWrite.clear();
215    }
216  }
217
218  BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
219    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
220        rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
221    this.rpcClient = rpcClient;
222    if (remoteId.getAddress().isUnresolved()) {
223      throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
224    }
225
226    this.connectionHeaderPreamble = getConnectionHeaderPreamble();
227    ConnectionHeader header = getConnectionHeader();
228    ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize());
229    DataOutputStream dos = new DataOutputStream(baos);
230    dos.writeInt(header.getSerializedSize());
231    header.writeTo(dos);
232    assert baos.size() == 4 + header.getSerializedSize();
233    this.connectionHeaderWithLength = baos.getBuffer();
234
235    UserGroupInformation ticket = remoteId.ticket.getUGI();
236    this.threadName = "IPC Client (" + this.rpcClient.socketFactory.hashCode() + ") connection to "
237        + remoteId.getAddress().toString()
238        + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName()));
239
240    if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) {
241      callSender = new CallSender(threadName, this.rpcClient.conf);
242      callSender.start();
243    } else {
244      callSender = null;
245    }
246  }
247
248  // protected for write UT.
249  protected void setupConnection() throws IOException {
250    short ioFailures = 0;
251    short timeoutFailures = 0;
252    while (true) {
253      try {
254        this.socket = this.rpcClient.socketFactory.createSocket();
255        this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay());
256        this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive);
257        if (this.rpcClient.localAddr != null) {
258          this.socket.bind(this.rpcClient.localAddr);
259        }
260        NetUtils.connect(this.socket, remoteId.getAddress(), this.rpcClient.connectTO);
261        this.socket.setSoTimeout(this.rpcClient.readTO);
262        return;
263      } catch (SocketTimeoutException toe) {
264        /*
265         * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries.
266         */
267        if (LOG.isDebugEnabled()) {
268          LOG.debug("Received exception in connection setup.\n" +
269              StringUtils.stringifyException(toe));
270        }
271        handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe);
272      } catch (IOException ie) {
273        if (LOG.isDebugEnabled()) {
274          LOG.debug("Received exception in connection setup.\n" +
275              StringUtils.stringifyException(ie));
276        }
277        handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie);
278      }
279    }
280  }
281
282  /**
283   * Handle connection failures If the current number of retries is equal to the max number of
284   * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting
285   * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence
286   * the sleep is synchronized; the locks will be retained.
287   * @param curRetries current number of retries
288   * @param maxRetries max number of retries allowed
289   * @param ioe failure reason
290   * @throws IOException if max number of retries is reached
291   */
292  private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
293      throws IOException {
294    closeSocket();
295
296    // throw the exception if the maximum number of retries is reached
297    if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
298      throw ioe;
299    }
300
301    // otherwise back off and retry
302    try {
303      Thread.sleep(this.rpcClient.failureSleep);
304    } catch (InterruptedException ie) {
305      ExceptionUtil.rethrowIfInterrupt(ie);
306    }
307
308    if (LOG.isInfoEnabled()) {
309      LOG.info("Retrying connect to server: " + remoteId.getAddress() +
310        " after sleeping " + this.rpcClient.failureSleep + "ms. Already tried " + curRetries +
311        " time(s).");
312    }
313  }
314
315  /*
316   * wait till someone signals us to start reading RPC response or it is idle too long, it is marked
317   * as to be closed, or the client is marked as not running.
318   * @return true if it is time to read a response; false otherwise.
319   */
320  private synchronized boolean waitForWork() {
321    // beware of the concurrent access to the calls list: we can add calls, but as well
322    // remove them.
323    long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose;
324    for (;;) {
325      if (thread == null) {
326        return false;
327      }
328      if (!calls.isEmpty()) {
329        return true;
330      }
331      if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
332        closeConn(
333          new IOException("idle connection closed with " + calls.size() + " pending request(s)"));
334        return false;
335      }
336      try {
337        wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
338      } catch (InterruptedException e) {
339      }
340    }
341  }
342
343  @Override
344  public void run() {
345    if (LOG.isTraceEnabled()) {
346      LOG.trace(threadName + ": starting, connections " + this.rpcClient.connections.size());
347    }
348    while (waitForWork()) {
349      readResponse();
350    }
351    if (LOG.isTraceEnabled()) {
352      LOG.trace(threadName + ": stopped, connections " + this.rpcClient.connections.size());
353    }
354  }
355
356  private void disposeSasl() {
357    if (saslRpcClient != null) {
358      saslRpcClient.dispose();
359      saslRpcClient = null;
360    }
361  }
362
363  private boolean setupSaslConnection(final InputStream in2, final OutputStream out2)
364      throws IOException {
365    saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal,
366        this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection",
367          QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
368        this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
369    return saslRpcClient.saslConnect(in2, out2);
370  }
371
372  /**
373   * If multiple clients with the same principal try to connect to the same server at the same time,
374   * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
375   * work around this, what is done is that the client backs off randomly and tries to initiate the
376   * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
377   * attempted.
378   * <p>
379   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
380   * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
381   * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
382   * underlying authentication implementation, so there is no retry from other high level (for eg,
383   * HCM or HBaseAdmin).
384   * </p>
385   */
386  private void handleSaslConnectionFailure(final int currRetries, final int maxRetries,
387      final Exception ex, final UserGroupInformation user)
388      throws IOException, InterruptedException {
389    closeSocket();
390    user.doAs(new PrivilegedExceptionAction<Object>() {
391      @Override
392      public Object run() throws IOException, InterruptedException {
393        if (shouldAuthenticateOverKrb()) {
394          if (currRetries < maxRetries) {
395            if (LOG.isDebugEnabled()) {
396              LOG.debug("Exception encountered while connecting to " +
397                "the server : " + StringUtils.stringifyException(ex));
398            }
399            // try re-login
400            relogin();
401            disposeSasl();
402            // have granularity of milliseconds
403            // we are sleeping with the Connection lock held but since this
404            // connection instance is being used for connecting to the server
405            // in question, it is okay
406            Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1);
407            return null;
408          } else {
409            String msg = "Couldn't setup connection for "
410                + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
411            LOG.warn(msg, ex);
412            throw (IOException) new IOException(msg).initCause(ex);
413          }
414        } else {
415          LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
416        }
417        if (ex instanceof RemoteException) {
418          throw (RemoteException) ex;
419        }
420        if (ex instanceof SaslException) {
421          String msg = "SASL authentication failed."
422              + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
423          LOG.error(HBaseMarkers.FATAL, msg, ex);
424          throw new RuntimeException(msg, ex);
425        }
426        throw new IOException(ex);
427      }
428    });
429  }
430
431  private void setupIOstreams() throws IOException {
432    if (socket != null) {
433      // The connection is already available. Perfect.
434      return;
435    }
436
437    if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) {
438      if (LOG.isDebugEnabled()) {
439        LOG.debug("Not trying to connect to " + remoteId.address
440            + " this server is in the failed servers list");
441      }
442      throw new FailedServerException(
443          "This server is in the failed servers list: " + remoteId.address);
444    }
445
446    try {
447      if (LOG.isDebugEnabled()) {
448        LOG.debug("Connecting to " + remoteId.address);
449      }
450
451      short numRetries = 0;
452      final short MAX_RETRIES = 5;
453      while (true) {
454        setupConnection();
455        InputStream inStream = NetUtils.getInputStream(socket);
456        // This creates a socket with a write timeout. This timeout cannot be changed.
457        OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
458        // Write out the preamble -- MAGIC, version, and auth to use.
459        writeConnectionHeaderPreamble(outStream);
460        if (useSasl) {
461          final InputStream in2 = inStream;
462          final OutputStream out2 = outStream;
463          UserGroupInformation ticket = getUGI();
464          boolean continueSasl;
465          if (ticket == null) {
466            throw new FatalConnectionException("ticket/user is null");
467          }
468          try {
469            continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
470              @Override
471              public Boolean run() throws IOException {
472                return setupSaslConnection(in2, out2);
473              }
474            });
475          } catch (Exception ex) {
476            ExceptionUtil.rethrowIfInterrupt(ex);
477            handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, ticket);
478            continue;
479          }
480          if (continueSasl) {
481            // Sasl connect is successful. Let's set up Sasl i/o streams.
482            inStream = saslRpcClient.getInputStream();
483            outStream = saslRpcClient.getOutputStream();
484          } else {
485            // fall back to simple auth because server told us so.
486            // do not change authMethod and useSasl here, we should start from secure when
487            // reconnecting because regionserver may change its sasl config after restart.
488          }
489        }
490        this.in = new DataInputStream(new BufferedInputStream(inStream));
491        this.out = new DataOutputStream(new BufferedOutputStream(outStream));
492        // Now write out the connection header
493        writeConnectionHeader();
494        // process the response from server for connection header if necessary
495        processResponseForConnectionHeader();
496
497        break;
498      }
499    } catch (Throwable t) {
500      closeSocket();
501      IOException e = ExceptionUtil.asInterrupt(t);
502      if (e == null) {
503        this.rpcClient.failedServers.addToFailedServers(remoteId.address, t);
504        if (t instanceof LinkageError) {
505          // probably the hbase hadoop version does not match the running hadoop version
506          e = new DoNotRetryIOException(t);
507        } else if (t instanceof IOException) {
508          e = (IOException) t;
509        } else {
510          e = new IOException("Could not set up IO Streams to " + remoteId.address, t);
511        }
512      }
513      throw e;
514    }
515
516    // start the receiver thread after the socket connection has been set up
517    thread = new Thread(this, threadName);
518    thread.setDaemon(true);
519    thread.start();
520  }
521
522  /**
523   * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
524   */
525  private void writeConnectionHeaderPreamble(OutputStream out) throws IOException {
526    out.write(connectionHeaderPreamble);
527    out.flush();
528  }
529
530  /**
531   * Write the connection header.
532   */
533  private void writeConnectionHeader() throws IOException {
534    boolean isCryptoAesEnable = false;
535    // check if Crypto AES is enabled
536    if (saslRpcClient != null) {
537      boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.
538          getSaslQop().equalsIgnoreCase(saslRpcClient.getSaslQOP());
539      isCryptoAesEnable = saslEncryptionEnabled && conf.getBoolean(
540          CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
541    }
542
543    // if Crypto AES is enabled, set transformation and negotiate with server
544    if (isCryptoAesEnable) {
545      waitingConnectionHeaderResponse = true;
546    }
547    this.out.write(connectionHeaderWithLength);
548    this.out.flush();
549  }
550
551  private void processResponseForConnectionHeader() throws IOException {
552    // if no response excepted, return
553    if (!waitingConnectionHeaderResponse) return;
554    try {
555      // read the ConnectionHeaderResponse from server
556      int len = this.in.readInt();
557      byte[] buff = new byte[len];
558      int readSize = this.in.read(buff);
559      if (LOG.isDebugEnabled()) {
560        LOG.debug("Length of response for connection header:" + readSize);
561      }
562
563      RPCProtos.ConnectionHeaderResponse connectionHeaderResponse =
564          RPCProtos.ConnectionHeaderResponse.parseFrom(buff);
565
566      // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher
567      if (connectionHeaderResponse.hasCryptoCipherMeta()) {
568        negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta());
569      }
570      waitingConnectionHeaderResponse = false;
571    } catch (SocketTimeoutException ste) {
572      LOG.error(HBaseMarkers.FATAL, "Can't get the connection header response for rpc timeout, "
573          + "please check if server has the correct configuration to support the additional "
574          + "function.", ste);
575      // timeout when waiting the connection header response, ignore the additional function
576      throw new IOException("Timeout while waiting connection header response", ste);
577    }
578  }
579
580  private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta)
581      throws IOException {
582    // initilize the Crypto AES with CryptoCipherMeta
583    saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf);
584    // reset the inputStream/outputStream for Crypto AES encryption
585    this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream()));
586    this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
587  }
588
589  private void tracedWriteRequest(Call call) throws IOException {
590    try (TraceScope ignored = TraceUtil.createTrace("RpcClientImpl.tracedWriteRequest",
591          call.span)) {
592      writeRequest(call);
593    }
594  }
595
596  /**
597   * Initiates a call by sending the parameter to the remote server. Note: this is not called from
598   * the Connection thread, but by other threads.
599   * @see #readResponse()
600   */
601  private void writeRequest(Call call) throws IOException {
602    ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec,
603      this.compressor, call.cells);
604    CellBlockMeta cellBlockMeta;
605    if (cellBlock != null) {
606      cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
607    } else {
608      cellBlockMeta = null;
609    }
610    RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
611
612    setupIOstreams();
613
614    // Now we're going to write the call. We take the lock, then check that the connection
615    // is still valid, and, if so we do the write to the socket. If the write fails, we don't
616    // know where we stand, we have to close the connection.
617    if (Thread.interrupted()) {
618      throw new InterruptedIOException();
619    }
620
621    calls.put(call.id, call); // We put first as we don't want the connection to become idle.
622    // from here, we do not throw any exception to upper layer as the call has been tracked in the
623    // pending calls map.
624    try {
625      call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
626    } catch (Throwable t) {
627      if(LOG.isTraceEnabled()) {
628        LOG.trace("Error while writing call, call_id:" + call.id, t);
629      }
630      IOException e = IPCUtil.toIOE(t);
631      closeConn(e);
632      return;
633    }
634    notifyAll();
635  }
636
637  /*
638   * Receive a response. Because only one receiver, so no synchronization on in.
639   */
640  private void readResponse() {
641    Call call = null;
642    boolean expectedCall = false;
643    try {
644      // See HBaseServer.Call.setResponse for where we write out the response.
645      // Total size of the response. Unused. But have to read it in anyways.
646      int totalSize = in.readInt();
647
648      // Read the header
649      ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
650      int id = responseHeader.getCallId();
651      call = calls.remove(id); // call.done have to be set before leaving this method
652      expectedCall = (call != null && !call.isDone());
653      if (!expectedCall) {
654        // So we got a response for which we have no corresponding 'call' here on the client-side.
655        // We probably timed out waiting, cleaned up all references, and now the server decides
656        // to return a response. There is nothing we can do w/ the response at this stage. Clean
657        // out the wire of the response so its out of the way and we can get other responses on
658        // this connection.
659        int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader);
660        int whatIsLeftToRead = totalSize - readSoFar;
661        IOUtils.skipFully(in, whatIsLeftToRead);
662        if (call != null) {
663          call.callStats.setResponseSizeBytes(totalSize);
664          call.callStats
665              .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
666        }
667        return;
668      }
669      if (responseHeader.hasException()) {
670        ExceptionResponse exceptionResponse = responseHeader.getException();
671        RemoteException re = createRemoteException(exceptionResponse);
672        call.setException(re);
673        call.callStats.setResponseSizeBytes(totalSize);
674        call.callStats
675            .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
676        if (isFatalConnectionException(exceptionResponse)) {
677          synchronized (this) {
678            closeConn(re);
679          }
680        }
681      } else {
682        Message value = null;
683        if (call.responseDefaultType != null) {
684          Builder builder = call.responseDefaultType.newBuilderForType();
685          ProtobufUtil.mergeDelimitedFrom(builder, in);
686          value = builder.build();
687        }
688        CellScanner cellBlockScanner = null;
689        if (responseHeader.hasCellBlockMeta()) {
690          int size = responseHeader.getCellBlockMeta().getLength();
691          byte[] cellBlock = new byte[size];
692          IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
693          cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec,
694            this.compressor, cellBlock);
695        }
696        call.setResponse(value, cellBlockScanner);
697        call.callStats.setResponseSizeBytes(totalSize);
698        call.callStats
699            .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
700      }
701    } catch (IOException e) {
702      if (expectedCall) {
703        call.setException(e);
704      }
705      if (e instanceof SocketTimeoutException) {
706        // Clean up open calls but don't treat this as a fatal condition,
707        // since we expect certain responses to not make it by the specified
708        // {@link ConnectionId#rpcTimeout}.
709        if (LOG.isTraceEnabled()) {
710          LOG.trace("ignored", e);
711        }
712      } else {
713        synchronized (this) {
714          closeConn(e);
715        }
716      }
717    }
718  }
719
720  @Override
721  protected synchronized void callTimeout(Call call) {
722    // call sender
723    calls.remove(call.id);
724  }
725
726  // just close socket input and output.
727  private void closeSocket() {
728    IOUtils.closeStream(out);
729    IOUtils.closeStream(in);
730    IOUtils.closeSocket(socket);
731    out = null;
732    in = null;
733    socket = null;
734  }
735
736  // close socket, reader, and clean up all pending calls.
737  private void closeConn(IOException e) {
738    if (thread == null) {
739      return;
740    }
741    thread.interrupt();
742    thread = null;
743    closeSocket();
744    if (callSender != null) {
745      callSender.cleanup(e);
746    }
747    for (Call call : calls.values()) {
748      call.setException(e);
749    }
750    calls.clear();
751  }
752
753  // release all resources, the connection will not be used any more.
754  @Override
755  public synchronized void shutdown() {
756    closed = true;
757    if (callSender != null) {
758      callSender.interrupt();
759    }
760    closeConn(new IOException("connection to " + remoteId.address + " closed"));
761  }
762
763  @Override
764  public void cleanupConnection() {
765    // do nothing
766  }
767
768  @Override
769  public synchronized void sendRequest(final Call call, HBaseRpcController pcrc)
770      throws IOException {
771    pcrc.notifyOnCancel(new RpcCallback<Object>() {
772
773      @Override
774      public void run(Object parameter) {
775        setCancelled(call);
776        synchronized (BlockingRpcConnection.this) {
777          if (callSender != null) {
778            callSender.remove(call);
779          } else {
780            calls.remove(call.id);
781          }
782        }
783      }
784    }, new CancellationCallback() {
785
786      @Override
787      public void run(boolean cancelled) throws IOException {
788        if (cancelled) {
789          setCancelled(call);
790          return;
791        }
792        scheduleTimeoutTask(call);
793        if (callSender != null) {
794          callSender.sendCall(call);
795        } else {
796          tracedWriteRequest(call);
797        }
798      }
799    });
800  }
801
802  @Override
803  public synchronized boolean isActive() {
804    return thread != null;
805  }
806}