001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import io.opentelemetry.api.GlobalOpenTelemetry;
021import io.opentelemetry.context.Context;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.lang.reflect.InvocationTargetException;
025import java.net.ConnectException;
026import java.net.SocketTimeoutException;
027import java.nio.channels.ClosedChannelException;
028import java.util.Map;
029import java.util.concurrent.TimeoutException;
030import org.apache.commons.lang3.mutable.MutableInt;
031import org.apache.hadoop.hbase.DoNotRetryIOException;
032import org.apache.hadoop.hbase.HBaseIOException;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
036import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
037import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
038import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
039import org.apache.hadoop.hbase.net.Address;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.hadoop.ipc.RemoteException;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
048import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
049import org.apache.hbase.thirdparty.com.google.protobuf.Message;
050import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
051import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
052import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
053import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
054
055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
060
061/**
062 * Utility to help ipc'ing.
063 */
064@InterfaceAudience.Private
065class IPCUtil {
066
067  private static final Logger LOG = LoggerFactory.getLogger(IPCUtil.class);
068
069  /**
070   * Write out header, param, and cell block if there is one.
071   * @param dos       Stream to write into
072   * @param header    to write
073   * @param param     to write
074   * @param cellBlock to write
075   * @return Total number of bytes written.
076   * @throws IOException if write action fails
077   */
078  public static int write(final OutputStream dos, final Message header, final Message param,
079    final ByteBuf cellBlock) throws IOException {
080    // Must calculate total size and write that first so other side can read it all in in one
081    // swoop. This is dictated by how the server is currently written. Server needs to change
082    // if we are to be able to write without the length prefixing.
083    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
084    if (cellBlock != null) {
085      totalSize += cellBlock.readableBytes();
086    }
087    return write(dos, header, param, cellBlock, totalSize);
088  }
089
090  private static int write(final OutputStream dos, final Message header, final Message param,
091    final ByteBuf cellBlock, final int totalSize) throws IOException {
092    // I confirmed toBytes does same as DataOutputStream#writeInt.
093    dos.write(Bytes.toBytes(totalSize));
094    // This allocates a buffer that is the size of the message internally.
095    header.writeDelimitedTo(dos);
096    if (param != null) {
097      param.writeDelimitedTo(dos);
098    }
099    if (cellBlock != null) {
100      cellBlock.readBytes(dos, cellBlock.readableBytes());
101    }
102    dos.flush();
103    return totalSize;
104  }
105
106  /** Returns Size on the wire when the two messages are written with writeDelimitedTo */
107  public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
108    int totalSize = 0;
109    for (Message m : messages) {
110      if (m == null) {
111        continue;
112      }
113      totalSize += m.getSerializedSize();
114      totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize());
115    }
116    Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
117    return totalSize;
118  }
119
120  static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
121    RequestHeader.Builder builder = RequestHeader.newBuilder();
122    builder.setCallId(call.id);
123    RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder();
124    GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(),
125      traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value));
126    builder.setTraceInfo(traceBuilder.build());
127    builder.setMethodName(call.md.getName());
128    builder.setRequestParam(call.param != null);
129    if (cellBlockMeta != null) {
130      builder.setCellBlockMeta(cellBlockMeta);
131    }
132    // Only pass priority if there is one set.
133    if (call.priority != HConstants.PRIORITY_UNSET) {
134      builder.setPriority(call.priority);
135    }
136    if (call.attributes != null && !call.attributes.isEmpty()) {
137      HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder();
138      for (Map.Entry<String, byte[]> attribute : call.attributes.entrySet()) {
139        attributeBuilder.setName(attribute.getKey());
140        attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue()));
141        builder.addAttribute(attributeBuilder.build());
142      }
143    }
144    builder.setTimeout(call.timeout);
145
146    return builder.build();
147  }
148
149  /**
150   * @param e exception to be wrapped
151   * @return RemoteException made from passed <code>e</code>
152   */
153  static RemoteException createRemoteException(final ExceptionResponse e) {
154    String innerExceptionClassName = e.getExceptionClassName();
155    boolean doNotRetry = e.getDoNotRetry();
156    boolean serverOverloaded = e.hasServerOverloaded() && e.getServerOverloaded();
157    return e.hasHostname() ?
158    // If a hostname then add it to the RemoteWithExtrasException
159      new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
160        e.getPort(), doNotRetry, serverOverloaded)
161      : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry,
162        serverOverloaded);
163  }
164
165  /** Returns True if the exception is a fatal connection exception. */
166  static boolean isFatalConnectionException(ExceptionResponse e) {
167    if (e.getExceptionClassName().equals(FatalConnectionException.class.getName())) {
168      return true;
169    }
170    // try our best to check for sub classes of FatalConnectionException
171    try {
172      return e.getExceptionClassName() != null && FatalConnectionException.class.isAssignableFrom(
173        Class.forName(e.getExceptionClassName(), false, IPCUtil.class.getClassLoader()));
174      // Class.forName may throw ExceptionInInitializerError so we have to catch Throwable here
175    } catch (Throwable t) {
176      LOG.debug("Can not get class object for {}", e.getExceptionClassName(), t);
177      return false;
178    }
179  }
180
181  static boolean isSecurityNotEnabledException(IOException e) {
182    return e instanceof RemoteException
183      && SecurityNotEnabledException.class.getName().equals(((RemoteException) e).getClassName());
184  }
185
186  static IOException toIOE(Throwable t) {
187    if (t instanceof IOException) {
188      return (IOException) t;
189    } else {
190      return new IOException(t);
191    }
192  }
193
194  private static String getCallTarget(Address addr, RegionInfo regionInfo) {
195    return "address=" + addr
196      + (regionInfo != null ? ", region=" + regionInfo.getRegionNameAsString() : "");
197  }
198
199  /**
200   * Takes an Exception, the address, and if pertinent, the RegionInfo for the Region we were trying
201   * to connect to and returns an IOException with the input exception as the cause. The new
202   * exception provides the stack trace of the place where the exception is thrown and some extra
203   * diagnostics information.
204   * <p/>
205   * Notice that we will try our best to keep the original exception type when creating a new
206   * exception, especially for the 'connection' exceptions, as it is used to determine whether this
207   * is a network issue or the remote side tells us clearly what is wrong, which is important
208   * deciding whether to retry. If it is not possible to create a new exception with the same type,
209   * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be
210   * created.
211   * @param addr  target address
212   * @param error the relevant exception
213   * @return an exception to throw
214   * @see ClientExceptionsUtil#isConnectionException(Throwable)
215   */
216  static IOException wrapException(Address addr, RegionInfo regionInfo, Throwable error) {
217    if (error instanceof ConnectException) {
218      // connection refused; include the host:port in the error
219      return (IOException) new ConnectException(
220        "Call to " + getCallTarget(addr, regionInfo) + " failed on connection exception: " + error)
221          .initCause(error);
222    } else if (error instanceof SocketTimeoutException) {
223      return (IOException) new SocketTimeoutException(
224        "Call to " + getCallTarget(addr, regionInfo) + " failed because " + error).initCause(error);
225    } else if (error instanceof ConnectionClosingException) {
226      return new ConnectionClosingException(
227        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
228        error);
229    } else if (error instanceof ServerTooBusyException) {
230      // we already have address in the exception message
231      return (IOException) error;
232    } else if (error instanceof DoNotRetryIOException) {
233      // try our best to keep the original exception type
234      try {
235        return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class)
236          .getConstructor(String.class)
237          .newInstance(
238            "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error)
239          .initCause(error);
240      } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
241        | InvocationTargetException | NoSuchMethodException | SecurityException e) {
242        // just ignore, will just new a DoNotRetryIOException instead below
243      }
244      return new DoNotRetryIOException(
245        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
246        error);
247    } else if (error instanceof ConnectionClosedException) {
248      return new ConnectionClosedException(
249        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
250        error);
251    } else if (error instanceof CallTimeoutException) {
252      return new CallTimeoutException(
253        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
254        error);
255    } else if (error instanceof ClosedChannelException) {
256      // ClosedChannelException does not have a constructor which takes a String but it is a
257      // connection exception so we keep its original type
258      return (IOException) error;
259    } else if (error instanceof TimeoutException) {
260      // TimeoutException is not an IOException, let's convert it to TimeoutIOException.
261      return new TimeoutIOException(
262        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
263        error);
264    } else {
265      // try our best to keep the original exception type
266      if (error instanceof IOException) {
267        try {
268          return (IOException) error.getClass().asSubclass(IOException.class)
269            .getConstructor(String.class)
270            .newInstance(
271              "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error)
272            .initCause(error);
273        } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
274          | InvocationTargetException | NoSuchMethodException | SecurityException e) {
275          // just ignore, will just new an IOException instead below
276        }
277      }
278      return new HBaseIOException(
279        "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error,
280        error);
281    }
282  }
283
284  static void setCancelled(Call call) {
285    call.setException(new CallCancelledException(call.toShortString() + ", waitTime="
286      + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
287      + call.timeout));
288  }
289
290  private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() {
291
292    @Override
293    protected MutableInt initialValue() throws Exception {
294      return new MutableInt(0);
295    }
296  };
297
298  static final int MAX_DEPTH = 4;
299
300  static void execute(EventLoop eventLoop, Runnable action) {
301    if (eventLoop.inEventLoop()) {
302      // this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel
303      // implementation.
304      MutableInt depth = DEPTH.get();
305      if (depth.intValue() < MAX_DEPTH) {
306        depth.increment();
307        try {
308          action.run();
309        } finally {
310          depth.decrement();
311        }
312      } else {
313        eventLoop.execute(action);
314      }
315    } else {
316      eventLoop.execute(action);
317    }
318  }
319}