001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import io.opentelemetry.api.GlobalOpenTelemetry;
021import io.opentelemetry.context.Context;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.lang.reflect.InvocationTargetException;
025import java.net.ConnectException;
026import java.net.SocketTimeoutException;
027import java.nio.channels.ClosedChannelException;
028import java.util.concurrent.TimeoutException;
029import org.apache.commons.lang3.mutable.MutableInt;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HBaseIOException;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
035import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
036import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
037import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
038import org.apache.hadoop.hbase.net.Address;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.ipc.RemoteException;
042import org.apache.yetus.audience.InterfaceAudience;
043
044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
045import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
046import org.apache.hbase.thirdparty.com.google.protobuf.Message;
047import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
048import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
049import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
050
051import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
055
056/**
057 * Utility to help ipc'ing.
058 */
059@InterfaceAudience.Private
060class IPCUtil {
061
062  /**
063   * Write out header, param, and cell block if there is one.
064   * @param dos       Stream to write into
065   * @param header    to write
066   * @param param     to write
067   * @param cellBlock to write
068   * @return Total number of bytes written.
069   * @throws IOException if write action fails
070   */
071  public static int write(final OutputStream dos, final Message header, final Message param,
072    final ByteBuf cellBlock) throws IOException {
073    // Must calculate total size and write that first so other side can read it all in in one
074    // swoop. This is dictated by how the server is currently written. Server needs to change
075    // if we are to be able to write without the length prefixing.
076    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
077    if (cellBlock != null) {
078      totalSize += cellBlock.readableBytes();
079    }
080    return write(dos, header, param, cellBlock, totalSize);
081  }
082
083  private static int write(final OutputStream dos, final Message header, final Message param,
084    final ByteBuf cellBlock, final int totalSize) throws IOException {
085    // I confirmed toBytes does same as DataOutputStream#writeInt.
086    dos.write(Bytes.toBytes(totalSize));
087    // This allocates a buffer that is the size of the message internally.
088    header.writeDelimitedTo(dos);
089    if (param != null) {
090      param.writeDelimitedTo(dos);
091    }
092    if (cellBlock != null) {
093      cellBlock.readBytes(dos, cellBlock.readableBytes());
094    }
095    dos.flush();
096    return totalSize;
097  }
098
099  /**
100   * @return Size on the wire when the two messages are written with writeDelimitedTo
101   */
102  public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
103    int totalSize = 0;
104    for (Message m : messages) {
105      if (m == null) {
106        continue;
107      }
108      totalSize += m.getSerializedSize();
109      totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize());
110    }
111    Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
112    return totalSize;
113  }
114
115  static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
116    RequestHeader.Builder builder = RequestHeader.newBuilder();
117    builder.setCallId(call.id);
118    RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder();
119    GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(),
120      traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value));
121    builder.setTraceInfo(traceBuilder.build());
122    builder.setMethodName(call.md.getName());
123    builder.setRequestParam(call.param != null);
124    if (cellBlockMeta != null) {
125      builder.setCellBlockMeta(cellBlockMeta);
126    }
127    // Only pass priority if there is one set.
128    if (call.priority != HConstants.PRIORITY_UNSET) {
129      builder.setPriority(call.priority);
130    }
131    builder.setTimeout(call.timeout);
132
133    return builder.build();
134  }
135
136  /**
137   * @param e exception to be wrapped
138   * @return RemoteException made from passed <code>e</code>
139   */
140  static RemoteException createRemoteException(final ExceptionResponse e) {
141    String innerExceptionClassName = e.getExceptionClassName();
142    boolean doNotRetry = e.getDoNotRetry();
143    boolean serverOverloaded = e.hasServerOverloaded() && e.getServerOverloaded();
144    return e.hasHostname() ?
145    // If a hostname then add it to the RemoteWithExtrasException
146      new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
147        e.getPort(), doNotRetry, serverOverloaded)
148      : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry,
149        serverOverloaded);
150  }
151
152  /**
153   * @return True if the exception is a fatal connection exception.
154   */
155  static boolean isFatalConnectionException(final ExceptionResponse e) {
156    return e.getExceptionClassName().equals(FatalConnectionException.class.getName());
157  }
158
159  static IOException toIOE(Throwable t) {
160    if (t instanceof IOException) {
161      return (IOException) t;
162    } else {
163      return new IOException(t);
164    }
165  }
166
167  private static String getCallTarget(Address addr, RegionInfo regionInfo) {
168    return "address=" + addr
169      + (regionInfo != null ? ", region=" + regionInfo.getRegionNameAsString() : "");
170  }
171
172  /**
173   * Takes an Exception, the address, and if pertinent, the RegionInfo for the Region we were trying
174   * to connect to and returns an IOException with the input exception as the cause. The new
175   * exception provides the stack trace of the place where the exception is thrown and some extra
176   * diagnostics information.
177   * <p/>
178   * Notice that we will try our best to keep the original exception type when creating a new
179   * exception, especially for the 'connection' exceptions, as it is used to determine whether this
180   * is a network issue or the remote side tells us clearly what is wrong, which is important
181   * deciding whether to retry. If it is not possible to create a new exception with the same type,
182   * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be
183   * created.
184   * @param addr  target address
185   * @param error the relevant exception
186   * @return an exception to throw
187   * @see ClientExceptionsUtil#isConnectionException(Throwable)
188   */
189  static IOException wrapException(Address addr, RegionInfo regionInfo, Throwable error) {
190    if (error instanceof ConnectException) {
191      // connection refused; include the host:port in the error
192      return (IOException) new ConnectException(
193        "Call to " + getCallTarget(addr, regionInfo) + " failed on connection exception: " + error)
194          .initCause(error);
195    } else if (error instanceof SocketTimeoutException) {
196      return (IOException) new SocketTimeoutException(
197        "Call to " + getCallTarget(addr, regionInfo) + " failed because " + error).initCause(error);
198    } else if (error instanceof ConnectionClosingException) {
199      return new ConnectionClosingException(
200        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
201        error);
202    } else if (error instanceof ServerTooBusyException) {
203      // we already have address in the exception message
204      return (IOException) error;
205    } else if (error instanceof DoNotRetryIOException) {
206      // try our best to keep the original exception type
207      try {
208        return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class)
209          .getConstructor(String.class)
210          .newInstance(
211            "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error)
212          .initCause(error);
213      } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
214        | InvocationTargetException | NoSuchMethodException | SecurityException e) {
215        // just ignore, will just new a DoNotRetryIOException instead below
216      }
217      return new DoNotRetryIOException(
218        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
219        error);
220    } else if (error instanceof ConnectionClosedException) {
221      return new ConnectionClosedException(
222        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
223        error);
224    } else if (error instanceof CallTimeoutException) {
225      return new CallTimeoutException(
226        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
227        error);
228    } else if (error instanceof ClosedChannelException) {
229      // ClosedChannelException does not have a constructor which takes a String but it is a
230      // connection exception so we keep its original type
231      return (IOException) error;
232    } else if (error instanceof TimeoutException) {
233      // TimeoutException is not an IOException, let's convert it to TimeoutIOException.
234      return new TimeoutIOException(
235        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
236        error);
237    } else {
238      // try our best to keep the original exception type
239      if (error instanceof IOException) {
240        try {
241          return (IOException) error.getClass().asSubclass(IOException.class)
242            .getConstructor(String.class)
243            .newInstance(
244              "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error)
245            .initCause(error);
246        } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
247          | InvocationTargetException | NoSuchMethodException | SecurityException e) {
248          // just ignore, will just new an IOException instead below
249        }
250      }
251      return new HBaseIOException(
252        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
253        error);
254    }
255  }
256
257  static void setCancelled(Call call) {
258    call.setException(new CallCancelledException(call.toShortString() + ", waitTime="
259      + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
260      + call.timeout));
261  }
262
263  private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() {
264
265    @Override
266    protected MutableInt initialValue() throws Exception {
267      return new MutableInt(0);
268    }
269  };
270
271  static final int MAX_DEPTH = 4;
272
273  static void execute(EventLoop eventLoop, Runnable action) {
274    if (eventLoop.inEventLoop()) {
275      // this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel
276      // implementation.
277      MutableInt depth = DEPTH.get();
278      if (depth.intValue() < MAX_DEPTH) {
279        depth.increment();
280        try {
281          action.run();
282        } finally {
283          depth.decrement();
284        }
285      } else {
286        eventLoop.execute(action);
287      }
288    } else {
289      eventLoop.execute(action);
290    }
291  }
292}