001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import io.opentelemetry.api.GlobalOpenTelemetry; 021import io.opentelemetry.context.Context; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.lang.reflect.InvocationTargetException; 025import java.net.ConnectException; 026import java.net.SocketTimeoutException; 027import java.nio.channels.ClosedChannelException; 028import java.util.concurrent.TimeoutException; 029import org.apache.commons.lang3.mutable.MutableInt; 030import org.apache.hadoop.hbase.DoNotRetryIOException; 031import org.apache.hadoop.hbase.HBaseIOException; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 035import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 036import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 037import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 038import org.apache.hadoop.hbase.net.Address; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.ipc.RemoteException; 042import org.apache.yetus.audience.InterfaceAudience; 043 044import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 045import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 046import org.apache.hbase.thirdparty.com.google.protobuf.Message; 047import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 048import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 049import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; 050 051import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; 055 056/** 057 * Utility to help ipc'ing. 058 */ 059@InterfaceAudience.Private 060class IPCUtil { 061 062 /** 063 * Write out header, param, and cell block if there is one. 064 * @param dos Stream to write into 065 * @param header to write 066 * @param param to write 067 * @param cellBlock to write 068 * @return Total number of bytes written. 069 * @throws IOException if write action fails 070 */ 071 public static int write(final OutputStream dos, final Message header, final Message param, 072 final ByteBuf cellBlock) throws IOException { 073 // Must calculate total size and write that first so other side can read it all in in one 074 // swoop. This is dictated by how the server is currently written. Server needs to change 075 // if we are to be able to write without the length prefixing. 076 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); 077 if (cellBlock != null) { 078 totalSize += cellBlock.readableBytes(); 079 } 080 return write(dos, header, param, cellBlock, totalSize); 081 } 082 083 private static int write(final OutputStream dos, final Message header, final Message param, 084 final ByteBuf cellBlock, final int totalSize) throws IOException { 085 // I confirmed toBytes does same as DataOutputStream#writeInt. 086 dos.write(Bytes.toBytes(totalSize)); 087 // This allocates a buffer that is the size of the message internally. 088 header.writeDelimitedTo(dos); 089 if (param != null) { 090 param.writeDelimitedTo(dos); 091 } 092 if (cellBlock != null) { 093 cellBlock.readBytes(dos, cellBlock.readableBytes()); 094 } 095 dos.flush(); 096 return totalSize; 097 } 098 099 /** 100 * @return Size on the wire when the two messages are written with writeDelimitedTo 101 */ 102 public static int getTotalSizeWhenWrittenDelimited(Message... messages) { 103 int totalSize = 0; 104 for (Message m : messages) { 105 if (m == null) { 106 continue; 107 } 108 totalSize += m.getSerializedSize(); 109 totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize()); 110 } 111 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); 112 return totalSize; 113 } 114 115 static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) { 116 RequestHeader.Builder builder = RequestHeader.newBuilder(); 117 builder.setCallId(call.id); 118 RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder(); 119 GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), 120 traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value)); 121 builder.setTraceInfo(traceBuilder.build()); 122 builder.setMethodName(call.md.getName()); 123 builder.setRequestParam(call.param != null); 124 if (cellBlockMeta != null) { 125 builder.setCellBlockMeta(cellBlockMeta); 126 } 127 // Only pass priority if there is one set. 128 if (call.priority != HConstants.PRIORITY_UNSET) { 129 builder.setPriority(call.priority); 130 } 131 builder.setTimeout(call.timeout); 132 133 return builder.build(); 134 } 135 136 /** 137 * @param e exception to be wrapped 138 * @return RemoteException made from passed <code>e</code> 139 */ 140 static RemoteException createRemoteException(final ExceptionResponse e) { 141 String innerExceptionClassName = e.getExceptionClassName(); 142 boolean doNotRetry = e.getDoNotRetry(); 143 boolean serverOverloaded = e.hasServerOverloaded() && e.getServerOverloaded(); 144 return e.hasHostname() ? 145 // If a hostname then add it to the RemoteWithExtrasException 146 new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), 147 e.getPort(), doNotRetry, serverOverloaded) 148 : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry, 149 serverOverloaded); 150 } 151 152 /** 153 * @return True if the exception is a fatal connection exception. 154 */ 155 static boolean isFatalConnectionException(final ExceptionResponse e) { 156 return e.getExceptionClassName().equals(FatalConnectionException.class.getName()); 157 } 158 159 static IOException toIOE(Throwable t) { 160 if (t instanceof IOException) { 161 return (IOException) t; 162 } else { 163 return new IOException(t); 164 } 165 } 166 167 private static String getCallTarget(Address addr, RegionInfo regionInfo) { 168 return "address=" + addr 169 + (regionInfo != null ? ", region=" + regionInfo.getRegionNameAsString() : ""); 170 } 171 172 /** 173 * Takes an Exception, the address, and if pertinent, the RegionInfo for the Region we were trying 174 * to connect to and returns an IOException with the input exception as the cause. The new 175 * exception provides the stack trace of the place where the exception is thrown and some extra 176 * diagnostics information. 177 * <p/> 178 * Notice that we will try our best to keep the original exception type when creating a new 179 * exception, especially for the 'connection' exceptions, as it is used to determine whether this 180 * is a network issue or the remote side tells us clearly what is wrong, which is important 181 * deciding whether to retry. If it is not possible to create a new exception with the same type, 182 * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be 183 * created. 184 * @param addr target address 185 * @param error the relevant exception 186 * @return an exception to throw 187 * @see ClientExceptionsUtil#isConnectionException(Throwable) 188 */ 189 static IOException wrapException(Address addr, RegionInfo regionInfo, Throwable error) { 190 if (error instanceof ConnectException) { 191 // connection refused; include the host:port in the error 192 return (IOException) new ConnectException( 193 "Call to " + getCallTarget(addr, regionInfo) + " failed on connection exception: " + error) 194 .initCause(error); 195 } else if (error instanceof SocketTimeoutException) { 196 return (IOException) new SocketTimeoutException( 197 "Call to " + getCallTarget(addr, regionInfo) + " failed because " + error).initCause(error); 198 } else if (error instanceof ConnectionClosingException) { 199 return new ConnectionClosingException( 200 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 201 error); 202 } else if (error instanceof ServerTooBusyException) { 203 // we already have address in the exception message 204 return (IOException) error; 205 } else if (error instanceof DoNotRetryIOException) { 206 // try our best to keep the original exception type 207 try { 208 return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class) 209 .getConstructor(String.class) 210 .newInstance( 211 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error) 212 .initCause(error); 213 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 214 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 215 // just ignore, will just new a DoNotRetryIOException instead below 216 } 217 return new DoNotRetryIOException( 218 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 219 error); 220 } else if (error instanceof ConnectionClosedException) { 221 return new ConnectionClosedException( 222 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 223 error); 224 } else if (error instanceof CallTimeoutException) { 225 return new CallTimeoutException( 226 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 227 error); 228 } else if (error instanceof ClosedChannelException) { 229 // ClosedChannelException does not have a constructor which takes a String but it is a 230 // connection exception so we keep its original type 231 return (IOException) error; 232 } else if (error instanceof TimeoutException) { 233 // TimeoutException is not an IOException, let's convert it to TimeoutIOException. 234 return new TimeoutIOException( 235 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 236 error); 237 } else { 238 // try our best to keep the original exception type 239 if (error instanceof IOException) { 240 try { 241 return (IOException) error.getClass().asSubclass(IOException.class) 242 .getConstructor(String.class) 243 .newInstance( 244 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error) 245 .initCause(error); 246 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 247 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 248 // just ignore, will just new an IOException instead below 249 } 250 } 251 return new HBaseIOException( 252 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 253 error); 254 } 255 } 256 257 static void setCancelled(Call call) { 258 call.setException(new CallCancelledException(call.toShortString() + ", waitTime=" 259 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 260 + call.timeout)); 261 } 262 263 private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() { 264 265 @Override 266 protected MutableInt initialValue() throws Exception { 267 return new MutableInt(0); 268 } 269 }; 270 271 static final int MAX_DEPTH = 4; 272 273 static void execute(EventLoop eventLoop, Runnable action) { 274 if (eventLoop.inEventLoop()) { 275 // this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel 276 // implementation. 277 MutableInt depth = DEPTH.get(); 278 if (depth.intValue() < MAX_DEPTH) { 279 depth.increment(); 280 try { 281 action.run(); 282 } finally { 283 depth.decrement(); 284 } 285 } else { 286 eventLoop.execute(action); 287 } 288 } else { 289 eventLoop.execute(action); 290 } 291 } 292}