001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import io.opentelemetry.api.GlobalOpenTelemetry; 021import io.opentelemetry.context.Context; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.lang.reflect.InvocationTargetException; 025import java.net.ConnectException; 026import java.net.SocketTimeoutException; 027import java.nio.channels.ClosedChannelException; 028import java.util.Map; 029import java.util.concurrent.TimeoutException; 030import org.apache.commons.lang3.mutable.MutableInt; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.HBaseIOException; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 036import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 037import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 038import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 039import org.apache.hadoop.hbase.net.Address; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.hadoop.ipc.RemoteException; 043import org.apache.yetus.audience.InterfaceAudience; 044 045import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 046import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 047import org.apache.hbase.thirdparty.com.google.protobuf.Message; 048import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 049import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 050import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 051import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; 058 059/** 060 * Utility to help ipc'ing. 061 */ 062@InterfaceAudience.Private 063class IPCUtil { 064 065 /** 066 * Write out header, param, and cell block if there is one. 067 * @param dos Stream to write into 068 * @param header to write 069 * @param param to write 070 * @param cellBlock to write 071 * @return Total number of bytes written. 072 * @throws IOException if write action fails 073 */ 074 public static int write(final OutputStream dos, final Message header, final Message param, 075 final ByteBuf cellBlock) throws IOException { 076 // Must calculate total size and write that first so other side can read it all in in one 077 // swoop. This is dictated by how the server is currently written. Server needs to change 078 // if we are to be able to write without the length prefixing. 079 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); 080 if (cellBlock != null) { 081 totalSize += cellBlock.readableBytes(); 082 } 083 return write(dos, header, param, cellBlock, totalSize); 084 } 085 086 private static int write(final OutputStream dos, final Message header, final Message param, 087 final ByteBuf cellBlock, final int totalSize) throws IOException { 088 // I confirmed toBytes does same as DataOutputStream#writeInt. 089 dos.write(Bytes.toBytes(totalSize)); 090 // This allocates a buffer that is the size of the message internally. 091 header.writeDelimitedTo(dos); 092 if (param != null) { 093 param.writeDelimitedTo(dos); 094 } 095 if (cellBlock != null) { 096 cellBlock.readBytes(dos, cellBlock.readableBytes()); 097 } 098 dos.flush(); 099 return totalSize; 100 } 101 102 /** Returns Size on the wire when the two messages are written with writeDelimitedTo */ 103 public static int getTotalSizeWhenWrittenDelimited(Message... messages) { 104 int totalSize = 0; 105 for (Message m : messages) { 106 if (m == null) { 107 continue; 108 } 109 totalSize += m.getSerializedSize(); 110 totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize()); 111 } 112 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); 113 return totalSize; 114 } 115 116 static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) { 117 RequestHeader.Builder builder = RequestHeader.newBuilder(); 118 builder.setCallId(call.id); 119 RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder(); 120 GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), 121 traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value)); 122 builder.setTraceInfo(traceBuilder.build()); 123 builder.setMethodName(call.md.getName()); 124 builder.setRequestParam(call.param != null); 125 if (cellBlockMeta != null) { 126 builder.setCellBlockMeta(cellBlockMeta); 127 } 128 // Only pass priority if there is one set. 129 if (call.priority != HConstants.PRIORITY_UNSET) { 130 builder.setPriority(call.priority); 131 } 132 if (call.attributes != null && !call.attributes.isEmpty()) { 133 HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); 134 for (Map.Entry<String, byte[]> attribute : call.attributes.entrySet()) { 135 attributeBuilder.setName(attribute.getKey()); 136 attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); 137 builder.addAttribute(attributeBuilder.build()); 138 } 139 } 140 builder.setTimeout(call.timeout); 141 142 return builder.build(); 143 } 144 145 /** 146 * @param e exception to be wrapped 147 * @return RemoteException made from passed <code>e</code> 148 */ 149 static RemoteException createRemoteException(final ExceptionResponse e) { 150 String innerExceptionClassName = e.getExceptionClassName(); 151 boolean doNotRetry = e.getDoNotRetry(); 152 boolean serverOverloaded = e.hasServerOverloaded() && e.getServerOverloaded(); 153 return e.hasHostname() ? 154 // If a hostname then add it to the RemoteWithExtrasException 155 new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), 156 e.getPort(), doNotRetry, serverOverloaded) 157 : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry, 158 serverOverloaded); 159 } 160 161 /** Returns True if the exception is a fatal connection exception. */ 162 static boolean isFatalConnectionException(final ExceptionResponse e) { 163 return e.getExceptionClassName().equals(FatalConnectionException.class.getName()); 164 } 165 166 static IOException toIOE(Throwable t) { 167 if (t instanceof IOException) { 168 return (IOException) t; 169 } else { 170 return new IOException(t); 171 } 172 } 173 174 private static String getCallTarget(Address addr, RegionInfo regionInfo) { 175 return "address=" + addr 176 + (regionInfo != null ? ", region=" + regionInfo.getRegionNameAsString() : ""); 177 } 178 179 /** 180 * Takes an Exception, the address, and if pertinent, the RegionInfo for the Region we were trying 181 * to connect to and returns an IOException with the input exception as the cause. The new 182 * exception provides the stack trace of the place where the exception is thrown and some extra 183 * diagnostics information. 184 * <p/> 185 * Notice that we will try our best to keep the original exception type when creating a new 186 * exception, especially for the 'connection' exceptions, as it is used to determine whether this 187 * is a network issue or the remote side tells us clearly what is wrong, which is important 188 * deciding whether to retry. If it is not possible to create a new exception with the same type, 189 * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be 190 * created. 191 * @param addr target address 192 * @param error the relevant exception 193 * @return an exception to throw 194 * @see ClientExceptionsUtil#isConnectionException(Throwable) 195 */ 196 static IOException wrapException(Address addr, RegionInfo regionInfo, Throwable error) { 197 if (error instanceof ConnectException) { 198 // connection refused; include the host:port in the error 199 return (IOException) new ConnectException( 200 "Call to " + getCallTarget(addr, regionInfo) + " failed on connection exception: " + error) 201 .initCause(error); 202 } else if (error instanceof SocketTimeoutException) { 203 return (IOException) new SocketTimeoutException( 204 "Call to " + getCallTarget(addr, regionInfo) + " failed because " + error).initCause(error); 205 } else if (error instanceof ConnectionClosingException) { 206 return new ConnectionClosingException( 207 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 208 error); 209 } else if (error instanceof ServerTooBusyException) { 210 // we already have address in the exception message 211 return (IOException) error; 212 } else if (error instanceof DoNotRetryIOException) { 213 // try our best to keep the original exception type 214 try { 215 return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class) 216 .getConstructor(String.class) 217 .newInstance( 218 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error) 219 .initCause(error); 220 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 221 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 222 // just ignore, will just new a DoNotRetryIOException instead below 223 } 224 return new DoNotRetryIOException( 225 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 226 error); 227 } else if (error instanceof ConnectionClosedException) { 228 return new ConnectionClosedException( 229 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 230 error); 231 } else if (error instanceof CallTimeoutException) { 232 return new CallTimeoutException( 233 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 234 error); 235 } else if (error instanceof ClosedChannelException) { 236 // ClosedChannelException does not have a constructor which takes a String but it is a 237 // connection exception so we keep its original type 238 return (IOException) error; 239 } else if (error instanceof TimeoutException) { 240 // TimeoutException is not an IOException, let's convert it to TimeoutIOException. 241 return new TimeoutIOException( 242 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 243 error); 244 } else { 245 // try our best to keep the original exception type 246 if (error instanceof IOException) { 247 try { 248 return (IOException) error.getClass().asSubclass(IOException.class) 249 .getConstructor(String.class) 250 .newInstance( 251 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error) 252 .initCause(error); 253 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 254 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 255 // just ignore, will just new an IOException instead below 256 } 257 } 258 return new HBaseIOException( 259 "Call to " + getCallTarget(addr, regionInfo) + " failed on local exception: " + error, 260 error); 261 } 262 } 263 264 static void setCancelled(Call call) { 265 call.setException(new CallCancelledException(call.toShortString() + ", waitTime=" 266 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 267 + call.timeout)); 268 } 269 270 private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() { 271 272 @Override 273 protected MutableInt initialValue() throws Exception { 274 return new MutableInt(0); 275 } 276 }; 277 278 static final int MAX_DEPTH = 4; 279 280 static void execute(EventLoop eventLoop, Runnable action) { 281 if (eventLoop.inEventLoop()) { 282 // this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel 283 // implementation. 284 MutableInt depth = DEPTH.get(); 285 if (depth.intValue() < MAX_DEPTH) { 286 depth.increment(); 287 try { 288 action.run(); 289 } finally { 290 depth.decrement(); 291 } 292 } else { 293 eventLoop.execute(action); 294 } 295 } else { 296 eventLoop.execute(action); 297 } 298 } 299}