View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import com.google.common.base.Preconditions;
21  import com.google.protobuf.CodedOutputStream;
22  import com.google.protobuf.Message;
23
24  import java.io.IOException;
25  import java.io.OutputStream;
26  import java.net.ConnectException;
27  import java.net.InetSocketAddress;
28  import java.net.SocketTimeoutException;
29  import java.nio.ByteBuffer;
30
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
33  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
34  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
35  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
36  import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39  import org.apache.hadoop.ipc.RemoteException;
40
41  /**
42   * Utility to help ipc'ing.
43   */
44  @InterfaceAudience.Private
45  class IPCUtil {
46
47    /**
48     * Write out header, param, and cell block if there is one.
49     * @param dos Stream to write into
50     * @param header to write
51     * @param param to write
52     * @param cellBlock to write
53     * @return Total number of bytes written.
54     * @throws IOException if write action fails
55     */
56    public static int write(final OutputStream dos, final Message header, final Message param,
57        final ByteBuffer cellBlock) throws IOException {
58      // Must calculate total size and write that first so other side can read it all in in one
59      // swoop. This is dictated by how the server is currently written. Server needs to change
60      // if we are to be able to write without the length prefixing.
61      int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
62      if (cellBlock != null) {
63        totalSize += cellBlock.remaining();
64      }
65      return write(dos, header, param, cellBlock, totalSize);
66    }
67
68    private static int write(final OutputStream dos, final Message header, final Message param,
69        final ByteBuffer cellBlock, final int totalSize) throws IOException {
70      // I confirmed toBytes does same as DataOutputStream#writeInt.
71      dos.write(Bytes.toBytes(totalSize));
72      // This allocates a buffer that is the size of the message internally.
73      header.writeDelimitedTo(dos);
74      if (param != null) {
75        param.writeDelimitedTo(dos);
76      }
77      if (cellBlock != null) {
78        dos.write(cellBlock.array(), 0, cellBlock.remaining());
79      }
80      dos.flush();
81      return totalSize;
82    }
83
84    /**
85     * @return Size on the wire when the two messages are written with writeDelimitedTo
86     */
87    public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
88      int totalSize = 0;
89      for (Message m : messages) {
90        if (m == null) {
91          continue;
92        }
93        totalSize += m.getSerializedSize();
94        totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
95      }
96      Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
97      return totalSize;
98    }
99
100   static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
101     RequestHeader.Builder builder = RequestHeader.newBuilder();
102     builder.setCallId(call.id);
103     if (call.span != null) {
104       builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
105           .setTraceId(call.span.getTraceId()));
106     }
107     builder.setMethodName(call.md.getName());
108     builder.setRequestParam(call.param != null);
109     if (cellBlockMeta != null) {
110       builder.setCellBlockMeta(cellBlockMeta);
111     }
112     // Only pass priority if there is one set.
113     if (call.priority != HBaseRpcController.PRIORITY_UNSET) {
114       builder.setPriority(call.priority);
115     }
116     builder.setTimeout(call.timeout);
117
118     return builder.build();
119   }
120
121   /**
122    * @param e exception to be wrapped
123    * @return RemoteException made from passed <code>e</code>
124    */
125   static RemoteException createRemoteException(final ExceptionResponse e) {
126     String innerExceptionClassName = e.getExceptionClassName();
127     boolean doNotRetry = e.getDoNotRetry();
128     return e.hasHostname() ?
129     // If a hostname then add it to the RemoteWithExtrasException
130         new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
131             e.getPort(), doNotRetry)
132         : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
133   }
134
135   /**
136    * @return True if the exception is a fatal connection exception.
137    */
138   static boolean isFatalConnectionException(final ExceptionResponse e) {
139     return e.getExceptionClassName().equals(FatalConnectionException.class.getName());
140   }
141
142   static IOException toIOE(Throwable t) {
143     if (t instanceof IOException) {
144       return (IOException) t;
145     } else {
146       return new IOException(t);
147     }
148   }
149
150   /**
151    * Takes an Exception and the address we were trying to connect to and return an IOException with
152    * the input exception as the cause. The new exception provides the stack trace of the place where
153    * the exception is thrown and some extra diagnostics information. If the exception is
154    * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
155    * an IOException.
156    * @param addr target address
157    * @param exception the relevant exception
158    * @return an exception to throw
159    */
160   static IOException wrapException(InetSocketAddress addr, Exception exception) {
161     if (exception instanceof ConnectException) {
162       // connection refused; include the host:port in the error
163       return (ConnectException) new ConnectException(
164           "Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
165     } else if (exception instanceof SocketTimeoutException) {
166       return (SocketTimeoutException) new SocketTimeoutException(
167           "Call to " + addr + " failed because " + exception).initCause(exception);
168     } else if (exception instanceof ConnectionClosingException) {
169       return (ConnectionClosingException) new ConnectionClosingException(
170           "Call to " + addr + " failed on local exception: " + exception).initCause(exception);
171     } else {
172       return (IOException) new IOException(
173           "Call to " + addr + " failed on local exception: " + exception).initCause(exception);
174     }
175   }
176
177   static void setCancelled(Call call) {
178     call.setException(new CallCancelledException("Call id=" + call.id + ", waitTime="
179         + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimetout="
180         + call.timeout));
181   }
182 }