001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.io.OutputStream;
022import java.lang.reflect.InvocationTargetException;
023import java.net.ConnectException;
024import java.net.InetSocketAddress;
025import java.net.SocketTimeoutException;
026import java.nio.channels.ClosedChannelException;
027import java.util.concurrent.TimeoutException;
028import org.apache.commons.lang3.mutable.MutableInt;
029import org.apache.hadoop.hbase.DoNotRetryIOException;
030import org.apache.hadoop.hbase.HBaseIOException;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
033import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
034import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
035import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.ipc.RemoteException;
039import org.apache.yetus.audience.InterfaceAudience;
040
041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
042import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
043import org.apache.hbase.thirdparty.com.google.protobuf.Message;
044import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
045import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
046import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
047
048import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
051
052/**
053 * Utility to help ipc'ing.
054 */
055@InterfaceAudience.Private
056class IPCUtil {
057
058  /**
059   * Write out header, param, and cell block if there is one.
060   * @param dos Stream to write into
061   * @param header to write
062   * @param param to write
063   * @param cellBlock to write
064   * @return Total number of bytes written.
065   * @throws IOException if write action fails
066   */
067  public static int write(final OutputStream dos, final Message header, final Message param,
068      final ByteBuf cellBlock) throws IOException {
069    // Must calculate total size and write that first so other side can read it all in in one
070    // swoop. This is dictated by how the server is currently written. Server needs to change
071    // if we are to be able to write without the length prefixing.
072    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
073    if (cellBlock != null) {
074      totalSize += cellBlock.readableBytes();
075    }
076    return write(dos, header, param, cellBlock, totalSize);
077  }
078
079  private static int write(final OutputStream dos, final Message header, final Message param,
080      final ByteBuf cellBlock, final int totalSize) throws IOException {
081    // I confirmed toBytes does same as DataOutputStream#writeInt.
082    dos.write(Bytes.toBytes(totalSize));
083    // This allocates a buffer that is the size of the message internally.
084    header.writeDelimitedTo(dos);
085    if (param != null) {
086      param.writeDelimitedTo(dos);
087    }
088    if (cellBlock != null) {
089      cellBlock.readBytes(dos, cellBlock.readableBytes());
090    }
091    dos.flush();
092    return totalSize;
093  }
094
095  /**
096   * @return Size on the wire when the two messages are written with writeDelimitedTo
097   */
098  public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
099    int totalSize = 0;
100    for (Message m : messages) {
101      if (m == null) {
102        continue;
103      }
104      totalSize += m.getSerializedSize();
105      totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize());
106    }
107    Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
108    return totalSize;
109  }
110
111  static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
112    RequestHeader.Builder builder = RequestHeader.newBuilder();
113    builder.setCallId(call.id);
114    //TODO handle htrace API change, see HBASE-18895
115    /*if (call.span != null) {
116      builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
117          .setTraceId(call.span.getTracerId()));
118    }*/
119    builder.setMethodName(call.md.getName());
120    builder.setRequestParam(call.param != null);
121    if (cellBlockMeta != null) {
122      builder.setCellBlockMeta(cellBlockMeta);
123    }
124    // Only pass priority if there is one set.
125    if (call.priority != HConstants.PRIORITY_UNSET) {
126      builder.setPriority(call.priority);
127    }
128    builder.setTimeout(call.timeout);
129
130    return builder.build();
131  }
132
133  /**
134   * @param e exception to be wrapped
135   * @return RemoteException made from passed <code>e</code>
136   */
137  static RemoteException createRemoteException(final ExceptionResponse e) {
138    String innerExceptionClassName = e.getExceptionClassName();
139    boolean doNotRetry = e.getDoNotRetry();
140    return e.hasHostname() ?
141    // If a hostname then add it to the RemoteWithExtrasException
142        new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
143            e.getPort(), doNotRetry)
144        : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
145  }
146
147  /**
148   * @return True if the exception is a fatal connection exception.
149   */
150  static boolean isFatalConnectionException(final ExceptionResponse e) {
151    return e.getExceptionClassName().equals(FatalConnectionException.class.getName());
152  }
153
154  static IOException toIOE(Throwable t) {
155    if (t instanceof IOException) {
156      return (IOException) t;
157    } else {
158      return new IOException(t);
159    }
160  }
161
162  /**
163   * Takes an Exception and the address we were trying to connect to and return an IOException with
164   * the input exception as the cause. The new exception provides the stack trace of the place where
165   * the exception is thrown and some extra diagnostics information.
166   * <p/>
167   * Notice that we will try our best to keep the original exception type when creating a new
168   * exception, especially for the 'connection' exceptions, as it is used to determine whether this
169   * is a network issue or the remote side tells us clearly what is wrong, which is very important
170   * to decide whether to retry. If it is not possible to create a new exception with the same type,
171   * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be
172   * created.
173   * @param addr target address
174   * @param error the relevant exception
175   * @return an exception to throw
176   * @see ClientExceptionsUtil#isConnectionException(Throwable)
177   */
178  static IOException wrapException(InetSocketAddress addr, Throwable error) {
179    if (error instanceof ConnectException) {
180      // connection refused; include the host:port in the error
181      return (IOException) new ConnectException(
182        "Call to " + addr + " failed on connection exception: " + error).initCause(error);
183    } else if (error instanceof SocketTimeoutException) {
184      return (IOException) new SocketTimeoutException(
185        "Call to " + addr + " failed because " + error).initCause(error);
186    } else if (error instanceof ConnectionClosingException) {
187      return new ConnectionClosingException("Call to " + addr + " failed on local exception: "
188        + error, error);
189    } else if (error instanceof ServerTooBusyException) {
190      // we already have address in the exception message
191      return (IOException) error;
192    } else if (error instanceof DoNotRetryIOException) {
193      // try our best to keep the original exception type
194      try {
195        return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class)
196          .getConstructor(String.class)
197          .newInstance("Call to " + addr + " failed on local exception: " + error).initCause(error);
198      } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
199          | InvocationTargetException | NoSuchMethodException | SecurityException e) {
200        // just ignore, will just new a DoNotRetryIOException instead below
201      }
202      return new DoNotRetryIOException("Call to " + addr + " failed on local exception: "
203        + error, error);
204    } else if (error instanceof ConnectionClosedException) {
205      return new ConnectionClosedException("Call to " + addr + " failed on local exception: "
206        + error, error);
207    } else if (error instanceof CallTimeoutException) {
208      return new CallTimeoutException("Call to " + addr + " failed on local exception: "
209        + error, error);
210    } else if (error instanceof ClosedChannelException) {
211      // ClosedChannelException does not have a constructor which takes a String but it is a
212      // connection exception so we keep its original type
213      return (IOException) error;
214    } else if (error instanceof TimeoutException) {
215      // TimeoutException is not an IOException, let's convert it to TimeoutIOException.
216      return new TimeoutIOException("Call to " + addr + " failed on local exception: "
217        + error, error);
218    } else {
219      // try our best to keep the original exception type
220      if (error instanceof IOException) {
221        try {
222          return (IOException) error.getClass().asSubclass(IOException.class)
223            .getConstructor(String.class)
224            .newInstance("Call to " + addr + " failed on local exception: " + error)
225            .initCause(error);
226        } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
227            | InvocationTargetException | NoSuchMethodException | SecurityException e) {
228          // just ignore, will just new an IOException instead below
229        }
230      }
231      return new HBaseIOException("Call to " + addr + " failed on local exception: "
232        + error, error);
233    }
234  }
235
236  static void setCancelled(Call call) {
237    call.setException(new CallCancelledException(call.toShortString() + ", waitTime="
238        + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
239        + call.timeout));
240  }
241
242  private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() {
243
244    @Override
245    protected MutableInt initialValue() throws Exception {
246      return new MutableInt(0);
247    }
248  };
249
250  static final int MAX_DEPTH = 4;
251
252  static void execute(EventLoop eventLoop, Runnable action) {
253    if (eventLoop.inEventLoop()) {
254      // this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel
255      // implementation.
256      MutableInt depth = DEPTH.get();
257      if (depth.intValue() < MAX_DEPTH) {
258        depth.increment();
259        try {
260          action.run();
261        } finally {
262          depth.decrement();
263        }
264      } else {
265        eventLoop.execute(action);
266      }
267    } else {
268      eventLoop.execute(action);
269    }
270  }
271}