001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 021import org.apache.hbase.thirdparty.com.google.protobuf.Message; 022import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder; 023import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 024 025import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 026import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; 027import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 028import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 029import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 030import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 031import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 032import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner; 033 034import java.io.IOException; 035import java.util.HashMap; 036import java.util.Map; 037 038import org.apache.hadoop.hbase.CellScanner; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042import org.apache.hadoop.hbase.codec.Codec; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 047import org.apache.hadoop.io.compress.CompressionCodec; 048import org.apache.hadoop.ipc.RemoteException; 049 050/** 051 * The netty rpc handler. 052 * @since 2.0.0 053 */ 054@InterfaceAudience.Private 055class NettyRpcDuplexHandler extends ChannelDuplexHandler { 056 057 private static final Logger LOG = LoggerFactory.getLogger(NettyRpcDuplexHandler.class); 058 059 private final NettyRpcConnection conn; 060 061 private final CellBlockBuilder cellBlockBuilder; 062 063 private final Codec codec; 064 065 private final CompressionCodec compressor; 066 067 private final Map<Integer, Call> id2Call = new HashMap<>(); 068 069 public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder cellBlockBuilder, 070 Codec codec, CompressionCodec compressor) { 071 this.conn = conn; 072 this.cellBlockBuilder = cellBlockBuilder; 073 this.codec = codec; 074 this.compressor = compressor; 075 076 } 077 078 private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise promise) 079 throws IOException { 080 id2Call.put(call.id, call); 081 ByteBuf cellBlock = cellBlockBuilder.buildCellBlock(codec, compressor, call.cells, ctx.alloc()); 082 CellBlockMeta cellBlockMeta; 083 if (cellBlock != null) { 084 CellBlockMeta.Builder cellBlockMetaBuilder = CellBlockMeta.newBuilder(); 085 cellBlockMetaBuilder.setLength(cellBlock.writerIndex()); 086 cellBlockMeta = cellBlockMetaBuilder.build(); 087 } else { 088 cellBlockMeta = null; 089 } 090 RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, cellBlockMeta); 091 int sizeWithoutCellBlock = IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param); 092 int totalSize = cellBlock != null ? sizeWithoutCellBlock + cellBlock.writerIndex() 093 : sizeWithoutCellBlock; 094 ByteBuf buf = ctx.alloc().buffer(sizeWithoutCellBlock + 4); 095 buf.writeInt(totalSize); 096 try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf)) { 097 requestHeader.writeDelimitedTo(bbos); 098 if (call.param != null) { 099 call.param.writeDelimitedTo(bbos); 100 } 101 if (cellBlock != null) { 102 ChannelPromise withoutCellBlockPromise = ctx.newPromise(); 103 ctx.write(buf, withoutCellBlockPromise); 104 ChannelPromise cellBlockPromise = ctx.newPromise(); 105 ctx.write(cellBlock, cellBlockPromise); 106 PromiseCombiner combiner = new PromiseCombiner(); 107 combiner.addAll(withoutCellBlockPromise, cellBlockPromise); 108 combiner.finish(promise); 109 } else { 110 ctx.write(buf, promise); 111 } 112 } 113 } 114 115 @Override 116 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 117 throws Exception { 118 if (msg instanceof Call) { 119 writeRequest(ctx, (Call) msg, promise); 120 } else { 121 ctx.write(msg, promise); 122 } 123 } 124 125 private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException { 126 int totalSize = buf.readInt(); 127 ByteBufInputStream in = new ByteBufInputStream(buf); 128 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); 129 int id = responseHeader.getCallId(); 130 if (LOG.isTraceEnabled()) { 131 LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader) 132 + ", totalSize: " + totalSize + " bytes"); 133 } 134 RemoteException remoteExc; 135 if (responseHeader.hasException()) { 136 ExceptionResponse exceptionResponse = responseHeader.getException(); 137 remoteExc = IPCUtil.createRemoteException(exceptionResponse); 138 if (IPCUtil.isFatalConnectionException(exceptionResponse)) { 139 // Here we will cleanup all calls so do not need to fall back, just return. 140 exceptionCaught(ctx, remoteExc); 141 return; 142 } 143 } else { 144 remoteExc = null; 145 } 146 Call call = id2Call.remove(id); 147 if (call == null) { 148 // So we got a response for which we have no corresponding 'call' here on the client-side. 149 // We probably timed out waiting, cleaned up all references, and now the server decides 150 // to return a response. There is nothing we can do w/ the response at this stage. Clean 151 // out the wire of the response so its out of the way and we can get other responses on 152 // this connection. 153 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); 154 int whatIsLeftToRead = totalSize - readSoFar; 155 if (LOG.isDebugEnabled()) { 156 LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead 157 + " bytes"); 158 } 159 return; 160 } 161 if (remoteExc != null) { 162 call.setException(remoteExc); 163 return; 164 } 165 Message value; 166 if (call.responseDefaultType != null) { 167 Builder builder = call.responseDefaultType.newBuilderForType(); 168 builder.mergeDelimitedFrom(in); 169 value = builder.build(); 170 } else { 171 value = null; 172 } 173 CellScanner cellBlockScanner; 174 if (responseHeader.hasCellBlockMeta()) { 175 int size = responseHeader.getCellBlockMeta().getLength(); 176 // Maybe we could read directly from the ByteBuf. 177 // The problem here is that we do not know when to release it. 178 byte[] cellBlock = new byte[size]; 179 buf.readBytes(cellBlock); 180 cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); 181 } else { 182 cellBlockScanner = null; 183 } 184 call.setResponse(value, cellBlockScanner); 185 } 186 187 @Override 188 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 189 if (msg instanceof ByteBuf) { 190 ByteBuf buf = (ByteBuf) msg; 191 try { 192 readResponse(ctx, buf); 193 } finally { 194 buf.release(); 195 } 196 } else { 197 super.channelRead(ctx, msg); 198 } 199 } 200 201 private void cleanupCalls(ChannelHandlerContext ctx, IOException error) { 202 for (Call call : id2Call.values()) { 203 call.setException(error); 204 } 205 id2Call.clear(); 206 } 207 208 @Override 209 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 210 if (!id2Call.isEmpty()) { 211 cleanupCalls(ctx, new ConnectionClosedException("Connection closed")); 212 } 213 conn.shutdown(); 214 ctx.fireChannelInactive(); 215 } 216 217 @Override 218 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 219 if (!id2Call.isEmpty()) { 220 cleanupCalls(ctx, IPCUtil.toIOE(cause)); 221 } 222 conn.shutdown(); 223 } 224 225 @Override 226 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 227 if (evt instanceof IdleStateEvent) { 228 IdleStateEvent idleEvt = (IdleStateEvent) evt; 229 switch (idleEvt.state()) { 230 case WRITER_IDLE: 231 if (id2Call.isEmpty()) { 232 if (LOG.isTraceEnabled()) { 233 LOG.trace("shutdown connection to " + conn.remoteId().address 234 + " because idle for a long time"); 235 } 236 // It may happen that there are still some pending calls in the event loop queue and 237 // they will get a closed channel exception. But this is not a big deal as it rarely 238 // rarely happens and the upper layer could retry immediately. 239 conn.shutdown(); 240 } 241 break; 242 default: 243 LOG.warn("Unrecognized idle state " + idleEvt.state()); 244 break; 245 } 246 } else if (evt instanceof CallEvent) { 247 // just remove the call for now until we add other call event other than timeout and cancel. 248 id2Call.remove(((CallEvent) evt).call.id); 249 } else { 250 ctx.fireUserEventTriggered(evt); 251 } 252 } 253}