001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import io.opentelemetry.api.GlobalOpenTelemetry; 021import io.opentelemetry.context.Context; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.lang.reflect.InvocationTargetException; 025import java.net.ConnectException; 026import java.net.SocketTimeoutException; 027import java.nio.channels.ClosedChannelException; 028import java.util.Map; 029import java.util.concurrent.TimeoutException; 030import org.apache.commons.lang3.mutable.MutableInt; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.HBaseIOException; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 036import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 037import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 038import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 039import org.apache.hadoop.hbase.net.Address; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.ipc.RemoteException; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 048import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 049import org.apache.hbase.thirdparty.com.google.protobuf.Message; 050import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 051import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 052import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 053import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; 054 055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; 060 061/** 062 * Utility to help ipc'ing. 063 */ 064@InterfaceAudience.Private 065class IPCUtil { 066 067 private static final Logger LOG = LoggerFactory.getLogger(IPCUtil.class); 068 069 /** 070 * Write out header, param, and cell block if there is one. 071 * @param dos Stream to write into 072 * @param header to write 073 * @param param to write 074 * @param cellBlock to write 075 * @return Total number of bytes written. 076 * @throws IOException if write action fails 077 */ 078 public static int write(final OutputStream dos, final Message header, final Message param, 079 final ByteBuf cellBlock) throws IOException { 080 // Must calculate total size and write that first so other side can read it all in in one 081 // swoop. This is dictated by how the server is currently written. Server needs to change 082 // if we are to be able to write without the length prefixing. 083 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); 084 if (cellBlock != null) { 085 totalSize += cellBlock.readableBytes(); 086 } 087 return write(dos, header, param, cellBlock, totalSize); 088 } 089 090 private static int write(final OutputStream dos, final Message header, final Message param, 091 final ByteBuf cellBlock, final int totalSize) throws IOException { 092 // I confirmed toBytes does same as DataOutputStream#writeInt. 093 dos.write(Bytes.toBytes(totalSize)); 094 // This allocates a buffer that is the size of the message internally. 095 header.writeDelimitedTo(dos); 096 if (param != null) { 097 param.writeDelimitedTo(dos); 098 } 099 if (cellBlock != null) { 100 cellBlock.readBytes(dos, cellBlock.readableBytes()); 101 } 102 dos.flush(); 103 return totalSize; 104 } 105 106 /** Returns Size on the wire when the two messages are written with writeDelimitedTo */ 107 public static int getTotalSizeWhenWrittenDelimited(Message... messages) { 108 int totalSize = 0; 109 for (Message m : messages) { 110 if (m == null) { 111 continue; 112 } 113 totalSize += m.getSerializedSize(); 114 totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize()); 115 } 116 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); 117 return totalSize; 118 } 119 120 static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) { 121 RequestHeader.Builder builder = RequestHeader.newBuilder(); 122 builder.setCallId(call.id); 123 RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder(); 124 GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), 125 traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value)); 126 builder.setTraceInfo(traceBuilder.build()); 127 builder.setMethodName(call.md.getName()); 128 builder.setRequestParam(call.param != null); 129 if (cellBlockMeta != null) { 130 builder.setCellBlockMeta(cellBlockMeta); 131 } 132 // Only pass priority if there is one set. 133 if (call.priority != HConstants.PRIORITY_UNSET) { 134 builder.setPriority(call.priority); 135 } 136 if (call.attributes != null && !call.attributes.isEmpty()) { 137 HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); 138 for (Map.Entry<String, byte[]> attribute : call.attributes.entrySet()) { 139 attributeBuilder.setName(attribute.getKey()); 140 attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); 141 builder.addAttribute(attributeBuilder.build()); 142 } 143 } 144 builder.setTimeout(call.timeout); 145 146 return builder.build(); 147 } 148 149 /** 150 * @param e exception to be wrapped 151 * @return RemoteException made from passed <code>e</code> 152 */ 153 static RemoteException createRemoteException(final ExceptionResponse e) { 154 String innerExceptionClassName = 155 ShadedPrefixUtil.getInstance().resolveShading(e.getExceptionClassName()); 156 boolean doNotRetry = e.getDoNotRetry(); 157 boolean serverOverloaded = e.hasServerOverloaded() && e.getServerOverloaded(); 158 return e.hasHostname() ? 159 // If a hostname then add it to the RemoteWithExtrasException 160 new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), 161 e.getPort(), doNotRetry, serverOverloaded) 162 : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry, 163 serverOverloaded); 164 } 165 166 /** Returns True if the exception is a fatal connection exception. */ 167 static boolean isFatalConnectionException(ExceptionResponse e) { 168 String exceptionClassName = 169 ShadedPrefixUtil.getInstance().resolveShading(e.getExceptionClassName()); 170 if (FatalConnectionException.class.getName().equals(exceptionClassName)) { 171 return true; 172 } 173 // try our best to check for sub classes of FatalConnectionException 174 try { 175 return exceptionClassName != null && FatalConnectionException.class 176 .isAssignableFrom(Class.forName(exceptionClassName, false, IPCUtil.class.getClassLoader())); 177 // Class.forName may throw ExceptionInInitializerError so we have to catch Throwable here 178 } catch (Throwable t) { 179 LOG.debug("Can not get class object for {}", exceptionClassName, t); 180 return false; 181 } 182 } 183 184 static boolean isSecurityNotEnabledException(IOException e) { 185 return e instanceof RemoteException 186 && SecurityNotEnabledException.class.getName().equals(((RemoteException) e).getClassName()); 187 } 188 189 static IOException toIOE(Throwable t) { 190 if (t instanceof IOException) { 191 return (IOException) t; 192 } else { 193 return new IOException(t); 194 } 195 } 196 197 private static String getCallTarget(Address addr, RegionInfo regionInfo) { 198 return "address=" + addr 199 + (regionInfo != null ? ", region=" + regionInfo.getRegionNameAsString() : ""); 200 } 201 202 /** 203 * Takes an Exception, the address, and if pertinent, the RegionInfo for the Region we were trying 204 * to connect to and returns an IOException with the input exception as the cause. The new 205 * exception provides the stack trace of the place where the exception is thrown and some extra 206 * diagnostics information. 207 * <p/> 208 * Notice that we will try our best to keep the original exception type when creating a new 209 * exception, especially for the 'connection' exceptions, as it is used to determine whether this 210 * is a network issue or the remote side tells us clearly what is wrong, which is important 211 * deciding whether to retry. If it is not possible to create a new exception with the same type, 212 * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be 213 * created. 214 * @param addr target address 215 * @param error the relevant exception 216 * @return an exception to throw 217 * @see ClientExceptionsUtil#isConnectionException(Throwable) 218 */ 219 static IOException wrapException(Address addr, RegionInfo regionInfo, Throwable error) { 220 if (error instanceof ConnectException) { 221 // connection refused; include the host:port in the error 222 return (IOException) new ConnectException( 223 "Call to " + getCallTarget(addr, regionInfo) + " failed on connection exception: " + error) 224 .initCause(error); 225 } else if (error instanceof SocketTimeoutException) { 226 return (IOException) new SocketTimeoutException( 227 "Call to " + getCallTarget(addr, regionInfo) + " failed because " + error).initCause(error); 228 } else if (error instanceof ConnectionClosingException) { 229 return new ConnectionClosingException( 230 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 231 error); 232 } else if (error instanceof ServerTooBusyException) { 233 // we already have address in the exception message 234 return (IOException) error; 235 } else if (error instanceof DoNotRetryIOException) { 236 // try our best to keep the original exception type 237 try { 238 return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class) 239 .getConstructor(String.class) 240 .newInstance( 241 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error) 242 .initCause(error); 243 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 244 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 245 // just ignore, will just new a DoNotRetryIOException instead below 246 } 247 return new DoNotRetryIOException( 248 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 249 error); 250 } else if (error instanceof ConnectionClosedException) { 251 return new ConnectionClosedException( 252 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 253 error); 254 } else if (error instanceof CallTimeoutException) { 255 return new CallTimeoutException( 256 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 257 error); 258 } else if (error instanceof ClosedChannelException) { 259 // ClosedChannelException does not have a constructor which takes a String but it is a 260 // connection exception so we keep its original type 261 return (IOException) error; 262 } else if (error instanceof TimeoutException) { 263 // TimeoutException is not an IOException, let's convert it to TimeoutIOException. 264 return new TimeoutIOException( 265 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 266 error); 267 } else { 268 // try our best to keep the original exception type 269 if (error instanceof IOException) { 270 try { 271 return (IOException) error.getClass().asSubclass(IOException.class) 272 .getConstructor(String.class) 273 .newInstance( 274 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error) 275 .initCause(error); 276 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 277 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 278 // just ignore, will just new an IOException instead below 279 } 280 } 281 return new HBaseIOException( 282 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 283 error); 284 } 285 } 286 287 static void setCancelled(Call call) { 288 call.setException(new CallCancelledException(call.toShortString() + ", waitTime=" 289 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 290 + call.timeout)); 291 } 292 293 private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() { 294 295 @Override 296 protected MutableInt initialValue() throws Exception { 297 return new MutableInt(0); 298 } 299 }; 300 301 static final int MAX_DEPTH = 4; 302 303 static void execute(EventLoop eventLoop, Runnable action) { 304 if (eventLoop.inEventLoop()) { 305 // this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel 306 // implementation. 307 MutableInt depth = DEPTH.get(); 308 if (depth.intValue() < MAX_DEPTH) { 309 depth.increment(); 310 try { 311 action.run(); 312 } finally { 313 depth.decrement(); 314 } 315 } else { 316 eventLoop.execute(action); 317 } 318 } else { 319 eventLoop.execute(action); 320 } 321 } 322}