001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import io.opentelemetry.api.GlobalOpenTelemetry; 021import io.opentelemetry.context.Context; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.lang.reflect.InvocationTargetException; 025import java.net.ConnectException; 026import java.net.SocketTimeoutException; 027import java.nio.channels.ClosedChannelException; 028import java.util.concurrent.TimeoutException; 029import org.apache.commons.lang3.mutable.MutableInt; 030import org.apache.hadoop.hbase.DoNotRetryIOException; 031import org.apache.hadoop.hbase.HBaseIOException; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 035import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 036import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 037import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 038import org.apache.hadoop.hbase.net.Address; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.ipc.RemoteException; 042import org.apache.yetus.audience.InterfaceAudience; 043 044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 045import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 046import org.apache.hbase.thirdparty.com.google.protobuf.Message; 047import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 048import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 049import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; 050 051import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; 055 056/** 057 * Utility to help ipc'ing. 058 */ 059@InterfaceAudience.Private 060class IPCUtil { 061 062 /** 063 * Write out header, param, and cell block if there is one. 064 * @param dos Stream to write into 065 * @param header to write 066 * @param param to write 067 * @param cellBlock to write 068 * @return Total number of bytes written. 069 * @throws IOException if write action fails 070 */ 071 public static int write(final OutputStream dos, final Message header, final Message param, 072 final ByteBuf cellBlock) throws IOException { 073 // Must calculate total size and write that first so other side can read it all in in one 074 // swoop. This is dictated by how the server is currently written. Server needs to change 075 // if we are to be able to write without the length prefixing. 076 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); 077 if (cellBlock != null) { 078 totalSize += cellBlock.readableBytes(); 079 } 080 return write(dos, header, param, cellBlock, totalSize); 081 } 082 083 private static int write(final OutputStream dos, final Message header, final Message param, 084 final ByteBuf cellBlock, final int totalSize) throws IOException { 085 // I confirmed toBytes does same as DataOutputStream#writeInt. 086 dos.write(Bytes.toBytes(totalSize)); 087 // This allocates a buffer that is the size of the message internally. 088 header.writeDelimitedTo(dos); 089 if (param != null) { 090 param.writeDelimitedTo(dos); 091 } 092 if (cellBlock != null) { 093 cellBlock.readBytes(dos, cellBlock.readableBytes()); 094 } 095 dos.flush(); 096 return totalSize; 097 } 098 099 /** Returns Size on the wire when the two messages are written with writeDelimitedTo */ 100 public static int getTotalSizeWhenWrittenDelimited(Message... messages) { 101 int totalSize = 0; 102 for (Message m : messages) { 103 if (m == null) { 104 continue; 105 } 106 totalSize += m.getSerializedSize(); 107 totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize()); 108 } 109 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); 110 return totalSize; 111 } 112 113 static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) { 114 RequestHeader.Builder builder = RequestHeader.newBuilder(); 115 builder.setCallId(call.id); 116 RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder(); 117 GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), 118 traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value)); 119 builder.setTraceInfo(traceBuilder.build()); 120 builder.setMethodName(call.md.getName()); 121 builder.setRequestParam(call.param != null); 122 if (cellBlockMeta != null) { 123 builder.setCellBlockMeta(cellBlockMeta); 124 } 125 // Only pass priority if there is one set. 126 if (call.priority != HConstants.PRIORITY_UNSET) { 127 builder.setPriority(call.priority); 128 } 129 builder.setTimeout(call.timeout); 130 131 return builder.build(); 132 } 133 134 /** 135 * @param e exception to be wrapped 136 * @return RemoteException made from passed <code>e</code> 137 */ 138 static RemoteException createRemoteException(final ExceptionResponse e) { 139 String innerExceptionClassName = e.getExceptionClassName(); 140 boolean doNotRetry = e.getDoNotRetry(); 141 boolean serverOverloaded = e.hasServerOverloaded() && e.getServerOverloaded(); 142 return e.hasHostname() ? 143 // If a hostname then add it to the RemoteWithExtrasException 144 new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), 145 e.getPort(), doNotRetry, serverOverloaded) 146 : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry, 147 serverOverloaded); 148 } 149 150 /** Returns True if the exception is a fatal connection exception. */ 151 static boolean isFatalConnectionException(final ExceptionResponse e) { 152 return e.getExceptionClassName().equals(FatalConnectionException.class.getName()); 153 } 154 155 static IOException toIOE(Throwable t) { 156 if (t instanceof IOException) { 157 return (IOException) t; 158 } else { 159 return new IOException(t); 160 } 161 } 162 163 private static String getCallTarget(Address addr, RegionInfo regionInfo) { 164 return "address=" + addr 165 + (regionInfo != null ? ", region=" + regionInfo.getRegionNameAsString() : ""); 166 } 167 168 /** 169 * Takes an Exception, the address, and if pertinent, the RegionInfo for the Region we were trying 170 * to connect to and returns an IOException with the input exception as the cause. The new 171 * exception provides the stack trace of the place where the exception is thrown and some extra 172 * diagnostics information. 173 * <p/> 174 * Notice that we will try our best to keep the original exception type when creating a new 175 * exception, especially for the 'connection' exceptions, as it is used to determine whether this 176 * is a network issue or the remote side tells us clearly what is wrong, which is important 177 * deciding whether to retry. If it is not possible to create a new exception with the same type, 178 * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be 179 * created. 180 * @param addr target address 181 * @param error the relevant exception 182 * @return an exception to throw 183 * @see ClientExceptionsUtil#isConnectionException(Throwable) 184 */ 185 static IOException wrapException(Address addr, RegionInfo regionInfo, Throwable error) { 186 if (error instanceof ConnectException) { 187 // connection refused; include the host:port in the error 188 return (IOException) new ConnectException( 189 "Call to " + getCallTarget(addr, regionInfo) + " failed on connection exception: " + error) 190 .initCause(error); 191 } else if (error instanceof SocketTimeoutException) { 192 return (IOException) new SocketTimeoutException( 193 "Call to " + getCallTarget(addr, regionInfo) + " failed because " + error).initCause(error); 194 } else if (error instanceof ConnectionClosingException) { 195 return new ConnectionClosingException( 196 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 197 error); 198 } else if (error instanceof ServerTooBusyException) { 199 // we already have address in the exception message 200 return (IOException) error; 201 } else if (error instanceof DoNotRetryIOException) { 202 // try our best to keep the original exception type 203 try { 204 return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class) 205 .getConstructor(String.class) 206 .newInstance( 207 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error) 208 .initCause(error); 209 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 210 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 211 // just ignore, will just new a DoNotRetryIOException instead below 212 } 213 return new DoNotRetryIOException( 214 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 215 error); 216 } else if (error instanceof ConnectionClosedException) { 217 return new ConnectionClosedException( 218 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 219 error); 220 } else if (error instanceof CallTimeoutException) { 221 return new CallTimeoutException( 222 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 223 error); 224 } else if (error instanceof ClosedChannelException) { 225 // ClosedChannelException does not have a constructor which takes a String but it is a 226 // connection exception so we keep its original type 227 return (IOException) error; 228 } else if (error instanceof TimeoutException) { 229 // TimeoutException is not an IOException, let's convert it to TimeoutIOException. 230 return new TimeoutIOException( 231 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 232 error); 233 } else { 234 // try our best to keep the original exception type 235 if (error instanceof IOException) { 236 try { 237 return (IOException) error.getClass().asSubclass(IOException.class) 238 .getConstructor(String.class) 239 .newInstance( 240 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error) 241 .initCause(error); 242 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 243 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 244 // just ignore, will just new an IOException instead below 245 } 246 } 247 return new HBaseIOException( 248 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 249 error); 250 } 251 } 252 253 static void setCancelled(Call call) { 254 call.setException(new CallCancelledException(call.toShortString() + ", waitTime=" 255 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 256 + call.timeout)); 257 } 258 259 private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() { 260 261 @Override 262 protected MutableInt initialValue() throws Exception { 263 return new MutableInt(0); 264 } 265 }; 266 267 static final int MAX_DEPTH = 4; 268 269 static void execute(EventLoop eventLoop, Runnable action) { 270 if (eventLoop.inEventLoop()) { 271 // this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel 272 // implementation. 273 MutableInt depth = DEPTH.get(); 274 if (depth.intValue() < MAX_DEPTH) { 275 depth.increment(); 276 try { 277 action.run(); 278 } finally { 279 depth.decrement(); 280 } 281 } else { 282 eventLoop.execute(action); 283 } 284 } else { 285 eventLoop.execute(action); 286 } 287 } 288}