001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.io.OutputStream;
022import java.lang.reflect.InvocationTargetException;
023import java.net.ConnectException;
024import java.net.InetSocketAddress;
025import java.net.SocketTimeoutException;
026import java.nio.channels.ClosedChannelException;
027import java.util.concurrent.TimeoutException;
028import org.apache.commons.lang3.mutable.MutableInt;
029import org.apache.hadoop.hbase.DoNotRetryIOException;
030import org.apache.hadoop.hbase.HBaseIOException;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
033import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
034import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
035import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.ipc.RemoteException;
039import org.apache.yetus.audience.InterfaceAudience;
040
041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
043import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
044import org.apache.hbase.thirdparty.com.google.protobuf.Message;
045import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
046import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
047import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
048
049import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
052
053/**
054 * Utility to help ipc'ing.
055 */
056@InterfaceAudience.Private
057class IPCUtil {
058
059  /**
060   * Write out header, param, and cell block if there is one.
061   * @param dos Stream to write into
062   * @param header to write
063   * @param param to write
064   * @param cellBlock to write
065   * @return Total number of bytes written.
066   * @throws IOException if write action fails
067   */
068  public static int write(final OutputStream dos, final Message header, final Message param,
069      final ByteBuf cellBlock) throws IOException {
070    // Must calculate total size and write that first so other side can read it all in in one
071    // swoop. This is dictated by how the server is currently written. Server needs to change
072    // if we are to be able to write without the length prefixing.
073    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
074    if (cellBlock != null) {
075      totalSize += cellBlock.readableBytes();
076    }
077    return write(dos, header, param, cellBlock, totalSize);
078  }
079
080  private static int write(final OutputStream dos, final Message header, final Message param,
081      final ByteBuf cellBlock, final int totalSize) throws IOException {
082    // I confirmed toBytes does same as DataOutputStream#writeInt.
083    dos.write(Bytes.toBytes(totalSize));
084    // This allocates a buffer that is the size of the message internally.
085    header.writeDelimitedTo(dos);
086    if (param != null) {
087      param.writeDelimitedTo(dos);
088    }
089    if (cellBlock != null) {
090      cellBlock.readBytes(dos, cellBlock.readableBytes());
091    }
092    dos.flush();
093    return totalSize;
094  }
095
096  /**
097   * @return Size on the wire when the two messages are written with writeDelimitedTo
098   */
099  public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
100    int totalSize = 0;
101    for (Message m : messages) {
102      if (m == null) {
103        continue;
104      }
105      totalSize += m.getSerializedSize();
106      totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize());
107    }
108    Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
109    return totalSize;
110  }
111
112  static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
113    RequestHeader.Builder builder = RequestHeader.newBuilder();
114    builder.setCallId(call.id);
115    //TODO handle htrace API change, see HBASE-18895
116    /*if (call.span != null) {
117      builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
118          .setTraceId(call.span.getTracerId()));
119    }*/
120    builder.setMethodName(call.md.getName());
121    builder.setRequestParam(call.param != null);
122    if (cellBlockMeta != null) {
123      builder.setCellBlockMeta(cellBlockMeta);
124    }
125    // Only pass priority if there is one set.
126    if (call.priority != HConstants.PRIORITY_UNSET) {
127      builder.setPriority(call.priority);
128    }
129    builder.setTimeout(call.timeout);
130
131    return builder.build();
132  }
133
134  /**
135   * @param e exception to be wrapped
136   * @return RemoteException made from passed <code>e</code>
137   */
138  static RemoteException createRemoteException(final ExceptionResponse e) {
139    String innerExceptionClassName = e.getExceptionClassName();
140    boolean doNotRetry = e.getDoNotRetry();
141    return e.hasHostname() ?
142    // If a hostname then add it to the RemoteWithExtrasException
143        new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
144            e.getPort(), doNotRetry)
145        : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
146  }
147
148  /**
149   * @return True if the exception is a fatal connection exception.
150   */
151  static boolean isFatalConnectionException(final ExceptionResponse e) {
152    return e.getExceptionClassName().equals(FatalConnectionException.class.getName());
153  }
154
155  static IOException toIOE(Throwable t) {
156    if (t instanceof IOException) {
157      return (IOException) t;
158    } else {
159      return new IOException(t);
160    }
161  }
162
163  /**
164   * Takes an Exception and the address we were trying to connect to and return an IOException with
165   * the input exception as the cause. The new exception provides the stack trace of the place where
166   * the exception is thrown and some extra diagnostics information.
167   * <p/>
168   * Notice that we will try our best to keep the original exception type when creating a new
169   * exception, especially for the 'connection' exceptions, as it is used to determine whether this
170   * is a network issue or the remote side tells us clearly what is wrong, which is very important
171   * to decide whether to retry. If it is not possible to create a new exception with the same type,
172   * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be
173   * created.
174   * @param addr target address
175   * @param error the relevant exception
176   * @return an exception to throw
177   * @see ClientExceptionsUtil#isConnectionException(Throwable)
178   */
179  static IOException wrapException(InetSocketAddress addr, Throwable error) {
180    if (error instanceof ConnectException) {
181      // connection refused; include the host:port in the error
182      return (IOException) new ConnectException(
183        "Call to " + addr + " failed on connection exception: " + error).initCause(error);
184    } else if (error instanceof SocketTimeoutException) {
185      return (IOException) new SocketTimeoutException(
186        "Call to " + addr + " failed because " + error).initCause(error);
187    } else if (error instanceof ConnectionClosingException) {
188      return new ConnectionClosingException("Call to " + addr + " failed on local exception: "
189        + error, error);
190    } else if (error instanceof ServerTooBusyException) {
191      // we already have address in the exception message
192      return (IOException) error;
193    } else if (error instanceof DoNotRetryIOException) {
194      // try our best to keep the original exception type
195      try {
196        return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class)
197          .getConstructor(String.class)
198          .newInstance("Call to " + addr + " failed on local exception: " + error).initCause(error);
199      } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
200          | InvocationTargetException | NoSuchMethodException | SecurityException e) {
201        // just ignore, will just new a DoNotRetryIOException instead below
202      }
203      return new DoNotRetryIOException("Call to " + addr + " failed on local exception: "
204        + error, error);
205    } else if (error instanceof ConnectionClosedException) {
206      return new ConnectionClosedException("Call to " + addr + " failed on local exception: "
207        + error, error);
208    } else if (error instanceof CallTimeoutException) {
209      return new CallTimeoutException("Call to " + addr + " failed on local exception: "
210        + error, error);
211    } else if (error instanceof ClosedChannelException) {
212      // ClosedChannelException does not have a constructor which takes a String but it is a
213      // connection exception so we keep its original type
214      return (IOException) error;
215    } else if (error instanceof TimeoutException) {
216      // TimeoutException is not an IOException, let's convert it to TimeoutIOException.
217      return new TimeoutIOException("Call to " + addr + " failed on local exception: "
218        + error, error);
219    } else {
220      // try our best to keep the original exception type
221      if (error instanceof IOException) {
222        try {
223          return (IOException) error.getClass().asSubclass(IOException.class)
224            .getConstructor(String.class)
225            .newInstance("Call to " + addr + " failed on local exception: " + error)
226            .initCause(error);
227        } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
228            | InvocationTargetException | NoSuchMethodException | SecurityException e) {
229          // just ignore, will just new an IOException instead below
230        }
231      }
232      return new HBaseIOException("Call to " + addr + " failed on local exception: "
233        + error, error);
234    }
235  }
236
237  static void setCancelled(Call call) {
238    call.setException(new CallCancelledException(call.toShortString() + ", waitTime="
239        + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
240        + call.timeout));
241  }
242
243  private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() {
244
245    @Override
246    protected MutableInt initialValue() throws Exception {
247      return new MutableInt(0);
248    }
249  };
250
251  @VisibleForTesting
252  static final int MAX_DEPTH = 4;
253
254  static void execute(EventLoop eventLoop, Runnable action) {
255    if (eventLoop.inEventLoop()) {
256      // this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel
257      // implementation.
258      MutableInt depth = DEPTH.get();
259      if (depth.intValue() < MAX_DEPTH) {
260        depth.increment();
261        try {
262          action.run();
263        } finally {
264          depth.decrement();
265        }
266      } else {
267        eventLoop.execute(action);
268      }
269    } else {
270      eventLoop.execute(action);
271    }
272  }
273}