001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.io.OutputStream;
022import java.lang.reflect.InvocationTargetException;
023import java.net.ConnectException;
024import java.net.InetSocketAddress;
025import java.net.SocketTimeoutException;
026import java.nio.channels.ClosedChannelException;
027import java.util.concurrent.TimeoutException;
028import org.apache.hadoop.hbase.DoNotRetryIOException;
029import org.apache.hadoop.hbase.HBaseIOException;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
032import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
033import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
034import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.ipc.RemoteException;
038import org.apache.yetus.audience.InterfaceAudience;
039
040import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
041import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
042import org.apache.hbase.thirdparty.com.google.protobuf.Message;
043import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
047
048/**
049 * Utility to help ipc'ing.
050 */
051@InterfaceAudience.Private
052class IPCUtil {
053
054  /**
055   * Write out header, param, and cell block if there is one.
056   * @param dos Stream to write into
057   * @param header to write
058   * @param param to write
059   * @param cellBlock to write
060   * @return Total number of bytes written.
061   * @throws IOException if write action fails
062   */
063  public static int write(final OutputStream dos, final Message header, final Message param,
064      final ByteBuf cellBlock) throws IOException {
065    // Must calculate total size and write that first so other side can read it all in in one
066    // swoop. This is dictated by how the server is currently written. Server needs to change
067    // if we are to be able to write without the length prefixing.
068    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
069    if (cellBlock != null) {
070      totalSize += cellBlock.readableBytes();
071    }
072    return write(dos, header, param, cellBlock, totalSize);
073  }
074
075  private static int write(final OutputStream dos, final Message header, final Message param,
076      final ByteBuf cellBlock, final int totalSize) throws IOException {
077    // I confirmed toBytes does same as DataOutputStream#writeInt.
078    dos.write(Bytes.toBytes(totalSize));
079    // This allocates a buffer that is the size of the message internally.
080    header.writeDelimitedTo(dos);
081    if (param != null) {
082      param.writeDelimitedTo(dos);
083    }
084    if (cellBlock != null) {
085      cellBlock.readBytes(dos, cellBlock.readableBytes());
086    }
087    dos.flush();
088    return totalSize;
089  }
090
091  /**
092   * @return Size on the wire when the two messages are written with writeDelimitedTo
093   */
094  public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
095    int totalSize = 0;
096    for (Message m : messages) {
097      if (m == null) {
098        continue;
099      }
100      totalSize += m.getSerializedSize();
101      totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize());
102    }
103    Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
104    return totalSize;
105  }
106
107  static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
108    RequestHeader.Builder builder = RequestHeader.newBuilder();
109    builder.setCallId(call.id);
110    //TODO handle htrace API change, see HBASE-18895
111    /*if (call.span != null) {
112      builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
113          .setTraceId(call.span.getTracerId()));
114    }*/
115    builder.setMethodName(call.md.getName());
116    builder.setRequestParam(call.param != null);
117    if (cellBlockMeta != null) {
118      builder.setCellBlockMeta(cellBlockMeta);
119    }
120    // Only pass priority if there is one set.
121    if (call.priority != HConstants.PRIORITY_UNSET) {
122      builder.setPriority(call.priority);
123    }
124    builder.setTimeout(call.timeout);
125
126    return builder.build();
127  }
128
129  /**
130   * @param e exception to be wrapped
131   * @return RemoteException made from passed <code>e</code>
132   */
133  static RemoteException createRemoteException(final ExceptionResponse e) {
134    String innerExceptionClassName = e.getExceptionClassName();
135    boolean doNotRetry = e.getDoNotRetry();
136    return e.hasHostname() ?
137    // If a hostname then add it to the RemoteWithExtrasException
138        new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
139            e.getPort(), doNotRetry)
140        : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
141  }
142
143  /**
144   * @return True if the exception is a fatal connection exception.
145   */
146  static boolean isFatalConnectionException(final ExceptionResponse e) {
147    return e.getExceptionClassName().equals(FatalConnectionException.class.getName());
148  }
149
150  static IOException toIOE(Throwable t) {
151    if (t instanceof IOException) {
152      return (IOException) t;
153    } else {
154      return new IOException(t);
155    }
156  }
157
158  /**
159   * Takes an Exception and the address we were trying to connect to and return an IOException with
160   * the input exception as the cause. The new exception provides the stack trace of the place where
161   * the exception is thrown and some extra diagnostics information.
162   * <p/>
163   * Notice that we will try our best to keep the original exception type when creating a new
164   * exception, especially for the 'connection' exceptions, as it is used to determine whether this
165   * is a network issue or the remote side tells us clearly what is wrong, which is very important
166   * to decide whether to retry. If it is not possible to create a new exception with the same type,
167   * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be
168   * created.
169   * @param addr target address
170   * @param error the relevant exception
171   * @return an exception to throw
172   * @see ClientExceptionsUtil#isConnectionException(Throwable)
173   */
174  static IOException wrapException(InetSocketAddress addr, Throwable error) {
175    if (error instanceof ConnectException) {
176      // connection refused; include the host:port in the error
177      return (IOException) new ConnectException(
178        "Call to " + addr + " failed on connection exception: " + error).initCause(error);
179    } else if (error instanceof SocketTimeoutException) {
180      return (IOException) new SocketTimeoutException(
181        "Call to " + addr + " failed because " + error).initCause(error);
182    } else if (error instanceof ConnectionClosingException) {
183      return (IOException) new ConnectionClosingException(
184        "Call to " + addr + " failed on local exception: " + error).initCause(error);
185    } else if (error instanceof ServerTooBusyException) {
186      // we already have address in the exception message
187      return (IOException) error;
188    } else if (error instanceof DoNotRetryIOException) {
189      // try our best to keep the original exception type
190      try {
191        return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class)
192          .getConstructor(String.class)
193          .newInstance("Call to " + addr + " failed on local exception: " + error).initCause(error);
194      } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
195          | InvocationTargetException | NoSuchMethodException | SecurityException e) {
196        // just ignore, will just new a DoNotRetryIOException instead below
197      }
198      return (IOException) new DoNotRetryIOException(
199        "Call to " + addr + " failed on local exception: " + error).initCause(error);
200    } else if (error instanceof ConnectionClosedException) {
201      return (IOException) new ConnectionClosedException(
202        "Call to " + addr + " failed on local exception: " + error).initCause(error);
203    } else if (error instanceof CallTimeoutException) {
204      return (IOException) new CallTimeoutException(
205        "Call to " + addr + " failed on local exception: " + error).initCause(error);
206    } else if (error instanceof ClosedChannelException) {
207      // ClosedChannelException does not have a constructor which takes a String but it is a
208      // connection exception so we keep its original type
209      return (IOException) error;
210    } else if (error instanceof TimeoutException) {
211      // TimeoutException is not an IOException, let's convert it to TimeoutIOException.
212      return (IOException) new TimeoutIOException(
213        "Call to " + addr + " failed on local exception: " + error).initCause(error);
214    } else {
215      // try our best to keep the original exception type
216      if (error instanceof IOException) {
217        try {
218          return (IOException) error.getClass().asSubclass(IOException.class)
219            .getConstructor(String.class)
220            .newInstance("Call to " + addr + " failed on local exception: " + error)
221            .initCause(error);
222        } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
223            | InvocationTargetException | NoSuchMethodException | SecurityException e) {
224          // just ignore, will just new an IOException instead below
225        }
226      }
227      return (IOException) new HBaseIOException(
228        "Call to " + addr + " failed on local exception: " + error).initCause(error);
229    }
230  }
231
232  static void setCancelled(Call call) {
233    call.setException(new CallCancelledException(call.toShortString() + ", waitTime="
234        + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
235        + call.timeout));
236  }
237}