001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.io.OutputStream; 022import java.lang.reflect.InvocationTargetException; 023import java.net.ConnectException; 024import java.net.InetSocketAddress; 025import java.net.SocketTimeoutException; 026import java.nio.channels.ClosedChannelException; 027import java.util.concurrent.TimeoutException; 028import org.apache.commons.lang3.mutable.MutableInt; 029import org.apache.hadoop.hbase.DoNotRetryIOException; 030import org.apache.hadoop.hbase.HBaseIOException; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 033import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 034import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 035import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.ipc.RemoteException; 039import org.apache.yetus.audience.InterfaceAudience; 040 041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 042import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 043import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 044import org.apache.hbase.thirdparty.com.google.protobuf.Message; 045import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 046import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 047import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; 048 049import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 052 053/** 054 * Utility to help ipc'ing. 055 */ 056@InterfaceAudience.Private 057class IPCUtil { 058 059 /** 060 * Write out header, param, and cell block if there is one. 061 * @param dos Stream to write into 062 * @param header to write 063 * @param param to write 064 * @param cellBlock to write 065 * @return Total number of bytes written. 066 * @throws IOException if write action fails 067 */ 068 public static int write(final OutputStream dos, final Message header, final Message param, 069 final ByteBuf cellBlock) throws IOException { 070 // Must calculate total size and write that first so other side can read it all in in one 071 // swoop. This is dictated by how the server is currently written. Server needs to change 072 // if we are to be able to write without the length prefixing. 073 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); 074 if (cellBlock != null) { 075 totalSize += cellBlock.readableBytes(); 076 } 077 return write(dos, header, param, cellBlock, totalSize); 078 } 079 080 private static int write(final OutputStream dos, final Message header, final Message param, 081 final ByteBuf cellBlock, final int totalSize) throws IOException { 082 // I confirmed toBytes does same as DataOutputStream#writeInt. 083 dos.write(Bytes.toBytes(totalSize)); 084 // This allocates a buffer that is the size of the message internally. 085 header.writeDelimitedTo(dos); 086 if (param != null) { 087 param.writeDelimitedTo(dos); 088 } 089 if (cellBlock != null) { 090 cellBlock.readBytes(dos, cellBlock.readableBytes()); 091 } 092 dos.flush(); 093 return totalSize; 094 } 095 096 /** 097 * @return Size on the wire when the two messages are written with writeDelimitedTo 098 */ 099 public static int getTotalSizeWhenWrittenDelimited(Message... messages) { 100 int totalSize = 0; 101 for (Message m : messages) { 102 if (m == null) { 103 continue; 104 } 105 totalSize += m.getSerializedSize(); 106 totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize()); 107 } 108 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); 109 return totalSize; 110 } 111 112 static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) { 113 RequestHeader.Builder builder = RequestHeader.newBuilder(); 114 builder.setCallId(call.id); 115 //TODO handle htrace API change, see HBASE-18895 116 /*if (call.span != null) { 117 builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId()) 118 .setTraceId(call.span.getTracerId())); 119 }*/ 120 builder.setMethodName(call.md.getName()); 121 builder.setRequestParam(call.param != null); 122 if (cellBlockMeta != null) { 123 builder.setCellBlockMeta(cellBlockMeta); 124 } 125 // Only pass priority if there is one set. 126 if (call.priority != HConstants.PRIORITY_UNSET) { 127 builder.setPriority(call.priority); 128 } 129 builder.setTimeout(call.timeout); 130 131 return builder.build(); 132 } 133 134 /** 135 * @param e exception to be wrapped 136 * @return RemoteException made from passed <code>e</code> 137 */ 138 static RemoteException createRemoteException(final ExceptionResponse e) { 139 String innerExceptionClassName = e.getExceptionClassName(); 140 boolean doNotRetry = e.getDoNotRetry(); 141 return e.hasHostname() ? 142 // If a hostname then add it to the RemoteWithExtrasException 143 new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), 144 e.getPort(), doNotRetry) 145 : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry); 146 } 147 148 /** 149 * @return True if the exception is a fatal connection exception. 150 */ 151 static boolean isFatalConnectionException(final ExceptionResponse e) { 152 return e.getExceptionClassName().equals(FatalConnectionException.class.getName()); 153 } 154 155 static IOException toIOE(Throwable t) { 156 if (t instanceof IOException) { 157 return (IOException) t; 158 } else { 159 return new IOException(t); 160 } 161 } 162 163 /** 164 * Takes an Exception and the address we were trying to connect to and return an IOException with 165 * the input exception as the cause. The new exception provides the stack trace of the place where 166 * the exception is thrown and some extra diagnostics information. 167 * <p/> 168 * Notice that we will try our best to keep the original exception type when creating a new 169 * exception, especially for the 'connection' exceptions, as it is used to determine whether this 170 * is a network issue or the remote side tells us clearly what is wrong, which is very important 171 * to decide whether to retry. If it is not possible to create a new exception with the same type, 172 * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be 173 * created. 174 * @param addr target address 175 * @param error the relevant exception 176 * @return an exception to throw 177 * @see ClientExceptionsUtil#isConnectionException(Throwable) 178 */ 179 static IOException wrapException(InetSocketAddress addr, Throwable error) { 180 if (error instanceof ConnectException) { 181 // connection refused; include the host:port in the error 182 return (IOException) new ConnectException( 183 "Call to " + addr + " failed on connection exception: " + error).initCause(error); 184 } else if (error instanceof SocketTimeoutException) { 185 return (IOException) new SocketTimeoutException( 186 "Call to " + addr + " failed because " + error).initCause(error); 187 } else if (error instanceof ConnectionClosingException) { 188 return new ConnectionClosingException("Call to " + addr + " failed on local exception: " 189 + error, error); 190 } else if (error instanceof ServerTooBusyException) { 191 // we already have address in the exception message 192 return (IOException) error; 193 } else if (error instanceof DoNotRetryIOException) { 194 // try our best to keep the original exception type 195 try { 196 return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class) 197 .getConstructor(String.class) 198 .newInstance("Call to " + addr + " failed on local exception: " + error).initCause(error); 199 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 200 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 201 // just ignore, will just new a DoNotRetryIOException instead below 202 } 203 return new DoNotRetryIOException("Call to " + addr + " failed on local exception: " 204 + error, error); 205 } else if (error instanceof ConnectionClosedException) { 206 return new ConnectionClosedException("Call to " + addr + " failed on local exception: " 207 + error, error); 208 } else if (error instanceof CallTimeoutException) { 209 return new CallTimeoutException("Call to " + addr + " failed on local exception: " 210 + error, error); 211 } else if (error instanceof ClosedChannelException) { 212 // ClosedChannelException does not have a constructor which takes a String but it is a 213 // connection exception so we keep its original type 214 return (IOException) error; 215 } else if (error instanceof TimeoutException) { 216 // TimeoutException is not an IOException, let's convert it to TimeoutIOException. 217 return new TimeoutIOException("Call to " + addr + " failed on local exception: " 218 + error, error); 219 } else { 220 // try our best to keep the original exception type 221 if (error instanceof IOException) { 222 try { 223 return (IOException) error.getClass().asSubclass(IOException.class) 224 .getConstructor(String.class) 225 .newInstance("Call to " + addr + " failed on local exception: " + error) 226 .initCause(error); 227 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 228 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 229 // just ignore, will just new an IOException instead below 230 } 231 } 232 return new HBaseIOException("Call to " + addr + " failed on local exception: " 233 + error, error); 234 } 235 } 236 237 static void setCancelled(Call call) { 238 call.setException(new CallCancelledException(call.toShortString() + ", waitTime=" 239 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 240 + call.timeout)); 241 } 242 243 private static final FastThreadLocal<MutableInt> DEPTH = new FastThreadLocal<MutableInt>() { 244 245 @Override 246 protected MutableInt initialValue() throws Exception { 247 return new MutableInt(0); 248 } 249 }; 250 251 @VisibleForTesting 252 static final int MAX_DEPTH = 4; 253 254 static void execute(EventLoop eventLoop, Runnable action) { 255 if (eventLoop.inEventLoop()) { 256 // this is used to prevent stack overflow, you can see the same trick in netty's LocalChannel 257 // implementation. 258 MutableInt depth = DEPTH.get(); 259 if (depth.intValue() < MAX_DEPTH) { 260 depth.increment(); 261 try { 262 action.run(); 263 } finally { 264 depth.decrement(); 265 } 266 } else { 267 eventLoop.execute(action); 268 } 269 } else { 270 eventLoop.execute(action); 271 } 272 } 273}