001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import io.opentelemetry.api.GlobalOpenTelemetry; 021import io.opentelemetry.context.Context; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.lang.reflect.InvocationTargetException; 025import java.net.ConnectException; 026import java.net.SocketTimeoutException; 027import java.nio.channels.ClosedChannelException; 028import java.util.Map; 029import java.util.concurrent.TimeoutException; 030import org.apache.commons.lang3.mutable.MutableInt; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.HBaseIOException; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 036import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 037import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 038import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 039import org.apache.hadoop.hbase.net.Address; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.ipc.RemoteException; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 048import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 049import org.apache.hbase.thirdparty.com.google.protobuf.Message; 050import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 051import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 052import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 053import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; 054 055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; 060 061/** 062 * Utility to help ipc'ing. 063 */ 064@InterfaceAudience.Private 065class IPCUtil { 066 067 private static final Logger LOG = LoggerFactory.getLogger(IPCUtil.class); 068 069 /** 070 * Write out header, param, and cell block if there is one. 071 * @param dos Stream to write into 072 * @param header to write 073 * @param param to write 074 * @param cellBlock to write 075 * @return Total number of bytes written. 076 * @throws IOException if write action fails 077 */ 078 public static int write(final OutputStream dos, final Message header, final Message param, 079 final ByteBuf cellBlock) throws IOException { 080 // Must calculate total size and write that first so other side can read it all in in one 081 // swoop. This is dictated by how the server is currently written. Server needs to change 082 // if we are to be able to write without the length prefixing. 083 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); 084 if (cellBlock != null) { 085 totalSize += cellBlock.readableBytes(); 086 } 087 return write(dos, header, param, cellBlock, totalSize); 088 } 089 090 private static int write(final OutputStream dos, final Message header, final Message param, 091 final ByteBuf cellBlock, final int totalSize) throws IOException { 092 // I confirmed toBytes does same as DataOutputStream#writeInt. 093 dos.write(Bytes.toBytes(totalSize)); 094 // This allocates a buffer that is the size of the message internally. 095 header.writeDelimitedTo(dos); 096 if (param != null) { 097 param.writeDelimitedTo(dos); 098 } 099 if (cellBlock != null) { 100 cellBlock.readBytes(dos, cellBlock.readableBytes()); 101 } 102 dos.flush(); 103 return totalSize; 104 } 105 106 /** Returns Size on the wire when the two messages are written with writeDelimitedTo */ 107 public static int getTotalSizeWhenWrittenDelimited(Message... messages) { 108 int totalSize = 0; 109 for (Message m : messages) { 110 if (m == null) { 111 continue; 112 } 113 totalSize += m.getSerializedSize(); 114 totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize()); 115 } 116 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); 117 return totalSize; 118 } 119 120 static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) { 121 RequestHeader.Builder builder = RequestHeader.newBuilder(); 122 builder.setCallId(call.id); 123 RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder(); 124 GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), 125 traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value)); 126 builder.setTraceInfo(traceBuilder.build()); 127 builder.setMethodName(call.md.getName()); 128 builder.setRequestParam(call.param != null); 129 if (cellBlockMeta != null) { 130 builder.setCellBlockMeta(cellBlockMeta); 131 } 132 // Only pass priority if there is one set. 133 if (call.priority != HConstants.PRIORITY_UNSET) { 134 builder.setPriority(call.priority); 135 } 136 if (call.attributes != null && !call.attributes.isEmpty()) { 137 HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); 138 for (Map.Entry<String, byte[]> attribute : call.attributes.entrySet()) { 139 attributeBuilder.setName(attribute.getKey()); 140 attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); 141 builder.addAttribute(attributeBuilder.build()); 142 } 143 } 144 builder.setTimeout(call.timeout); 145 146 return builder.build(); 147 } 148 149 /** 150 * @param e exception to be wrapped 151 * @return RemoteException made from passed <code>e</code> 152 */ 153 static RemoteException createRemoteException(final ExceptionResponse e) { 154 String innerExceptionClassName = e.getExceptionClassName(); 155 boolean doNotRetry = e.getDoNotRetry(); 156 boolean serverOverloaded = e.hasServerOverloaded() && e.getServerOverloaded(); 157 return e.hasHostname() ? 158 // If a hostname then add it to the RemoteWithExtrasException 159 new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), 160 e.getPort(), doNotRetry, serverOverloaded) 161 : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry, 162 serverOverloaded); 163 } 164 165 /** Returns True if the exception is a fatal connection exception. */ 166 static boolean isFatalConnectionException(ExceptionResponse e) { 167 if (e.getExceptionClassName().equals(FatalConnectionException.class.getName())) { 168 return true; 169 } 170 // try our best to check for sub classes of FatalConnectionException 171 try { 172 return e.getExceptionClassName() != null && FatalConnectionException.class.isAssignableFrom( 173 Class.forName(e.getExceptionClassName(), false, IPCUtil.class.getClassLoader())); 174 // Class.forName may throw ExceptionInInitializerError so we have to catch Throwable here 175 } catch (Throwable t) { 176 LOG.debug("Can not get class object for {}", e.getExceptionClassName(), t); 177 return false; 178 } 179 } 180 181 static boolean isSecurityNotEnabledException(IOException e) { 182 return e instanceof RemoteException 183 && SecurityNotEnabledException.class.getName().equals(((RemoteException) e).getClassName()); 184 } 185 186 static IOException toIOE(Throwable t) { 187 if (t instanceof IOException) { 188 return (IOException) t; 189 } else { 190 return new IOException(t); 191 } 192 } 193 194 private static String getCallTarget(Address addr, RegionInfo regionInfo) { 195 return "address=" + addr 196 + (regionInfo != null ? ", region=" + regionInfo.getRegionNameAsString() : ""); 197 } 198 199 /** 200 * Takes an Exception, the address, and if pertinent, the RegionInfo for the Region we were trying 201 * to connect to and returns an IOException with the input exception as the cause. The new 202 * exception provides the stack trace of the place where the exception is thrown and some extra 203 * diagnostics information. 204 * <p/> 205 * Notice that we will try our best to keep the original exception type when creating a new 206 * exception, especially for the 'connection' exceptions, as it is used to determine whether this 207 * is a network issue or the remote side tells us clearly what is wrong, which is important 208 * deciding whether to retry. If it is not possible to create a new exception with the same type, 209 * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be 210 * created. 211 * @param addr target address 212 * @param error the relevant exception 213 * @return an exception to throw 214 * @see ClientExceptionsUtil#isConnectionException(Throwable) 215 */ 216 static IOException wrapException(Address addr, RegionInfo regionInfo, Throwable error) { 217 if (error instanceof ConnectException) { 218 // connection refused; include the host:port in the error 219 return (IOException) new ConnectException( 220 "Call to " + getCallTarget(addr, regionInfo) + " failed on connection exception: " + error) 221 .initCause(error); 222 } else if (error instanceof SocketTimeoutException) { 223 return (IOException) new SocketTimeoutException( 224 "Call to " + getCallTarget(addr, regionInfo) + " failed because " + error).initCause(error); 225 } else if (error instanceof ConnectionClosingException) { 226 return new ConnectionClosingException( 227 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 228 error); 229 } else if (error instanceof ServerTooBusyException) { 230 // we already have address in the exception message 231 return (IOException) error; 232 } else if (error instanceof DoNotRetryIOException) { 233 // try our best to keep the original exception type 234 try { 235 return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class) 236 .getConstructor(String.class) 237 .newInstance( 238 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error) 239 .initCause(error); 240 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 241 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 242 // just ignore, will just new a DoNotRetryIOException instead below 243 } 244 return new DoNotRetryIOException( 245 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 246 error); 247 } else if (error instanceof ConnectionClosedException) { 248 return new ConnectionClosedException( 249 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 250 error); 251 } else if (error instanceof CallTimeoutException) { 252 return new CallTimeoutException( 253 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 254 error); 255 } else if (error instanceof ClosedChannelException) { 256 // ClosedChannelException does not have a constructor which takes a String but it is a 257 // connection exception so we keep its original type 258 return (IOException) error; 259 } else if (error instanceof TimeoutException) { 260 // TimeoutException is not an IOException, let's convert it to TimeoutIOException. 261 return new TimeoutIOException( 262 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 263 error); 264 } else { 265 // try our best to keep the original exception type 266 if (error instanceof IOException) { 267 try { 268 return (IOException) error.getClass().asSubclass(IOException.class) 269 .getConstructor(String.class) 270 .newInstance( 271 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error) 272 .initCause(error); 273 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 274 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 275 // just ignore, will just new an IOException instead below 276 } 277 } 278 return new HBaseIOException( 279 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 280 error); 281 } 282 } 283 284 static void setCancelled(Call call) { 285 call.setException(new CallCancelledException(call.toShortString() + ", waitTime=" 286 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 287 + call.timeout)); 288 } 289 290 private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() { 291 292 @Override 293 protected MutableInt initialValue() throws Exception { 294 return new MutableInt(0); 295 } 296 }; 297 298 static final int MAX_DEPTH = 4; 299 300 static void execute(EventLoop eventLoop, Runnable action) { 301 if (eventLoop.inEventLoop()) { 302 // this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel 303 // implementation. 304 MutableInt depth = DEPTH.get(); 305 if (depth.intValue() < MAX_DEPTH) { 306 depth.increment(); 307 try { 308 action.run(); 309 } finally { 310 depth.decrement(); 311 } 312 } else { 313 eventLoop.execute(action); 314 } 315 } else { 316 eventLoop.execute(action); 317 } 318 } 319}