001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.io.OutputStream; 022import java.lang.reflect.InvocationTargetException; 023import java.net.ConnectException; 024import java.net.InetSocketAddress; 025import java.net.SocketTimeoutException; 026import java.nio.ByteBuffer; 027import java.nio.channels.ClosedChannelException; 028import java.util.concurrent.TimeoutException; 029import org.apache.hadoop.hbase.DoNotRetryIOException; 030import org.apache.hadoop.hbase.HBaseIOException; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 033import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 034import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 035import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.ipc.RemoteException; 039import org.apache.yetus.audience.InterfaceAudience; 040 041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 042import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 043import org.apache.hbase.thirdparty.com.google.protobuf.Message; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 048 049/** 050 * Utility to help ipc'ing. 051 */ 052@InterfaceAudience.Private 053class IPCUtil { 054 055 /** 056 * Write out header, param, and cell block if there is one. 057 * @param dos Stream to write into 058 * @param header to write 059 * @param param to write 060 * @param cellBlock to write 061 * @return Total number of bytes written. 062 * @throws IOException if write action fails 063 */ 064 public static int write(final OutputStream dos, final Message header, final Message param, 065 final ByteBuffer cellBlock) throws IOException { 066 // Must calculate total size and write that first so other side can read it all in in one 067 // swoop. This is dictated by how the server is currently written. Server needs to change 068 // if we are to be able to write without the length prefixing. 069 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); 070 if (cellBlock != null) { 071 totalSize += cellBlock.remaining(); 072 } 073 return write(dos, header, param, cellBlock, totalSize); 074 } 075 076 private static int write(final OutputStream dos, final Message header, final Message param, 077 final ByteBuffer cellBlock, final int totalSize) throws IOException { 078 // I confirmed toBytes does same as DataOutputStream#writeInt. 079 dos.write(Bytes.toBytes(totalSize)); 080 // This allocates a buffer that is the size of the message internally. 081 header.writeDelimitedTo(dos); 082 if (param != null) { 083 param.writeDelimitedTo(dos); 084 } 085 if (cellBlock != null) { 086 dos.write(cellBlock.array(), 0, cellBlock.remaining()); 087 } 088 dos.flush(); 089 return totalSize; 090 } 091 092 /** 093 * @return Size on the wire when the two messages are written with writeDelimitedTo 094 */ 095 public static int getTotalSizeWhenWrittenDelimited(Message... messages) { 096 int totalSize = 0; 097 for (Message m : messages) { 098 if (m == null) { 099 continue; 100 } 101 totalSize += m.getSerializedSize(); 102 totalSize += CodedOutputStream.computeUInt32SizeNoTag(m.getSerializedSize()); 103 } 104 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); 105 return totalSize; 106 } 107 108 static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) { 109 RequestHeader.Builder builder = RequestHeader.newBuilder(); 110 builder.setCallId(call.id); 111 //TODO handle htrace API change, see HBASE-18895 112 /*if (call.span != null) { 113 builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId()) 114 .setTraceId(call.span.getTracerId())); 115 }*/ 116 builder.setMethodName(call.md.getName()); 117 builder.setRequestParam(call.param != null); 118 if (cellBlockMeta != null) { 119 builder.setCellBlockMeta(cellBlockMeta); 120 } 121 // Only pass priority if there is one set. 122 if (call.priority != HConstants.PRIORITY_UNSET) { 123 builder.setPriority(call.priority); 124 } 125 builder.setTimeout(call.timeout); 126 127 return builder.build(); 128 } 129 130 /** 131 * @param e exception to be wrapped 132 * @return RemoteException made from passed <code>e</code> 133 */ 134 static RemoteException createRemoteException(final ExceptionResponse e) { 135 String innerExceptionClassName = e.getExceptionClassName(); 136 boolean doNotRetry = e.getDoNotRetry(); 137 return e.hasHostname() ? 138 // If a hostname then add it to the RemoteWithExtrasException 139 new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), 140 e.getPort(), doNotRetry) 141 : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry); 142 } 143 144 /** 145 * @return True if the exception is a fatal connection exception. 146 */ 147 static boolean isFatalConnectionException(final ExceptionResponse e) { 148 return e.getExceptionClassName().equals(FatalConnectionException.class.getName()); 149 } 150 151 static IOException toIOE(Throwable t) { 152 if (t instanceof IOException) { 153 return (IOException) t; 154 } else { 155 return new IOException(t); 156 } 157 } 158 159 /** 160 * Takes an Exception and the address we were trying to connect to and return an IOException with 161 * the input exception as the cause. The new exception provides the stack trace of the place where 162 * the exception is thrown and some extra diagnostics information. 163 * <p/> 164 * Notice that we will try our best to keep the original exception type when creating a new 165 * exception, especially for the 'connection' exceptions, as it is used to determine whether this 166 * is a network issue or the remote side tells us clearly what is wrong, which is very important 167 * to decide whether to retry. If it is not possible to create a new exception with the same type, 168 * for example, the {@code error} is not an {@link IOException}, an {@link IOException} will be 169 * created. 170 * @param addr target address 171 * @param error the relevant exception 172 * @return an exception to throw 173 * @see ClientExceptionsUtil#isConnectionException(Throwable) 174 */ 175 static IOException wrapException(InetSocketAddress addr, Throwable error) { 176 if (error instanceof ConnectException) { 177 // connection refused; include the host:port in the error 178 return (IOException) new ConnectException( 179 "Call to " + addr + " failed on connection exception: " + error).initCause(error); 180 } else if (error instanceof SocketTimeoutException) { 181 return (IOException) new SocketTimeoutException( 182 "Call to " + addr + " failed because " + error).initCause(error); 183 } else if (error instanceof ConnectionClosingException) { 184 return (IOException) new ConnectionClosingException( 185 "Call to " + addr + " failed on local exception: " + error).initCause(error); 186 } else if (error instanceof ServerTooBusyException) { 187 // we already have address in the exception message 188 return (IOException) error; 189 } else if (error instanceof DoNotRetryIOException) { 190 // try our best to keep the original exception type 191 try { 192 return (IOException) error.getClass().asSubclass(DoNotRetryIOException.class) 193 .getConstructor(String.class) 194 .newInstance("Call to " + addr + " failed on local exception: " + error).initCause(error); 195 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 196 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 197 // just ignore, will just new a DoNotRetryIOException instead below 198 } 199 return (IOException) new DoNotRetryIOException( 200 "Call to " + addr + " failed on local exception: " + error).initCause(error); 201 } else if (error instanceof ConnectionClosedException) { 202 return (IOException) new ConnectionClosedException( 203 "Call to " + addr + " failed on local exception: " + error).initCause(error); 204 } else if (error instanceof CallTimeoutException) { 205 return (IOException) new CallTimeoutException( 206 "Call to " + addr + " failed on local exception: " + error).initCause(error); 207 } else if (error instanceof ClosedChannelException) { 208 // ClosedChannelException does not have a constructor which takes a String but it is a 209 // connection exception so we keep its original type 210 return (IOException) error; 211 } else if (error instanceof TimeoutException) { 212 // TimeoutException is not an IOException, let's convert it to TimeoutIOException. 213 return (IOException) new TimeoutIOException( 214 "Call to " + addr + " failed on local exception: " + error).initCause(error); 215 } else { 216 // try our best to keep the original exception type 217 if (error instanceof IOException) { 218 try { 219 return (IOException) error.getClass().asSubclass(IOException.class) 220 .getConstructor(String.class) 221 .newInstance("Call to " + addr + " failed on local exception: " + error) 222 .initCause(error); 223 } catch (InstantiationException | IllegalAccessException | IllegalArgumentException 224 | InvocationTargetException | NoSuchMethodException | SecurityException e) { 225 // just ignore, will just new an IOException instead below 226 } 227 } 228 return (IOException) new HBaseIOException( 229 "Call to " + addr + " failed on local exception: " + error).initCause(error); 230 } 231 } 232 233 static void setCancelled(Call call) { 234 call.setException(new CallCancelledException("Call id=" + call.id + ", waitTime=" 235 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" 236 + call.timeout)); 237 } 238}