001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import io.opentelemetry.context.Scope; 021import java.io.IOException; 022import java.util.HashMap; 023import java.util.Map; 024import org.apache.hadoop.hbase.CellScanner; 025import org.apache.hadoop.hbase.codec.Codec; 026import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 027import org.apache.hadoop.io.compress.CompressionCodec; 028import org.apache.hadoop.ipc.RemoteException; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import org.apache.hbase.thirdparty.com.google.protobuf.Message; 034import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 035import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 036import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; 037import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 038import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 039import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 040import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 041import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 042import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 043import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 049 050/** 051 * The netty rpc handler. 052 * @since 2.0.0 053 */ 054@InterfaceAudience.Private 055class NettyRpcDuplexHandler extends ChannelDuplexHandler { 056 057 private static final Logger LOG = LoggerFactory.getLogger(NettyRpcDuplexHandler.class); 058 059 private final NettyRpcConnection conn; 060 061 private final CellBlockBuilder cellBlockBuilder; 062 063 private final Codec codec; 064 065 private final CompressionCodec compressor; 066 067 private final Map<Integer, Call> id2Call = new HashMap<>(); 068 069 public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder cellBlockBuilder, 070 Codec codec, CompressionCodec compressor) { 071 this.conn = conn; 072 this.cellBlockBuilder = cellBlockBuilder; 073 this.codec = codec; 074 this.compressor = compressor; 075 076 } 077 078 private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise promise) 079 throws IOException { 080 id2Call.put(call.id, call); 081 ByteBuf cellBlock = cellBlockBuilder.buildCellBlock(codec, compressor, call.cells, ctx.alloc()); 082 CellBlockMeta cellBlockMeta; 083 if (cellBlock != null) { 084 CellBlockMeta.Builder cellBlockMetaBuilder = CellBlockMeta.newBuilder(); 085 cellBlockMetaBuilder.setLength(cellBlock.writerIndex()); 086 cellBlockMeta = cellBlockMetaBuilder.build(); 087 } else { 088 cellBlockMeta = null; 089 } 090 RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, cellBlockMeta); 091 int sizeWithoutCellBlock = IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param); 092 int totalSize = 093 cellBlock != null ? sizeWithoutCellBlock + cellBlock.writerIndex() : sizeWithoutCellBlock; 094 ByteBuf buf = ctx.alloc().buffer(sizeWithoutCellBlock + 4); 095 buf.writeInt(totalSize); 096 try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf)) { 097 requestHeader.writeDelimitedTo(bbos); 098 if (call.param != null) { 099 call.param.writeDelimitedTo(bbos); 100 } 101 if (cellBlock != null) { 102 ChannelPromise withoutCellBlockPromise = ctx.newPromise(); 103 ctx.write(buf, withoutCellBlockPromise); 104 ChannelPromise cellBlockPromise = ctx.newPromise(); 105 ctx.write(cellBlock, cellBlockPromise); 106 PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); 107 combiner.addAll((ChannelFuture) withoutCellBlockPromise, cellBlockPromise); 108 combiner.finish(promise); 109 } else { 110 ctx.write(buf, promise); 111 } 112 } 113 } 114 115 @Override 116 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 117 throws Exception { 118 if (msg instanceof Call) { 119 Call call = (Call) msg; 120 try (Scope scope = call.span.makeCurrent()) { 121 writeRequest(ctx, call, promise); 122 } 123 } else { 124 ctx.write(msg, promise); 125 } 126 } 127 128 private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException { 129 int totalSize = buf.readInt(); 130 ByteBufInputStream in = new ByteBufInputStream(buf); 131 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); 132 int id = responseHeader.getCallId(); 133 if (LOG.isTraceEnabled()) { 134 LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader) 135 + ", totalSize: " + totalSize + " bytes"); 136 } 137 RemoteException remoteExc; 138 if (responseHeader.hasException()) { 139 ExceptionResponse exceptionResponse = responseHeader.getException(); 140 remoteExc = IPCUtil.createRemoteException(exceptionResponse); 141 if (IPCUtil.isFatalConnectionException(exceptionResponse)) { 142 // Here we will cleanup all calls so do not need to fall back, just return. 143 exceptionCaught(ctx, remoteExc); 144 return; 145 } 146 } else { 147 remoteExc = null; 148 } 149 Call call = id2Call.remove(id); 150 if (call == null) { 151 // So we got a response for which we have no corresponding 'call' here on the client-side. 152 // We probably timed out waiting, cleaned up all references, and now the server decides 153 // to return a response. There is nothing we can do w/ the response at this stage. Clean 154 // out the wire of the response so its out of the way and we can get other responses on 155 // this connection. 156 if (LOG.isDebugEnabled()) { 157 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); 158 int whatIsLeftToRead = totalSize - readSoFar; 159 LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead 160 + " bytes"); 161 } 162 return; 163 } 164 if (remoteExc != null) { 165 call.setException(remoteExc); 166 return; 167 } 168 Message value; 169 if (call.responseDefaultType != null) { 170 Message.Builder builder = call.responseDefaultType.newBuilderForType(); 171 builder.mergeDelimitedFrom(in); 172 value = builder.build(); 173 } else { 174 value = null; 175 } 176 CellScanner cellBlockScanner; 177 if (responseHeader.hasCellBlockMeta()) { 178 int size = responseHeader.getCellBlockMeta().getLength(); 179 // Maybe we could read directly from the ByteBuf. 180 // The problem here is that we do not know when to release it. 181 byte[] cellBlock = new byte[size]; 182 buf.readBytes(cellBlock); 183 cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); 184 } else { 185 cellBlockScanner = null; 186 } 187 call.setResponse(value, cellBlockScanner); 188 } 189 190 @Override 191 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 192 if (msg instanceof ByteBuf) { 193 ByteBuf buf = (ByteBuf) msg; 194 try { 195 readResponse(ctx, buf); 196 } finally { 197 buf.release(); 198 } 199 } else { 200 super.channelRead(ctx, msg); 201 } 202 } 203 204 private void cleanupCalls(IOException error) { 205 for (Call call : id2Call.values()) { 206 call.setException(error); 207 } 208 id2Call.clear(); 209 } 210 211 @Override 212 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 213 if (!id2Call.isEmpty()) { 214 cleanupCalls(new ConnectionClosedException("Connection closed")); 215 } 216 conn.shutdown(); 217 ctx.fireChannelInactive(); 218 } 219 220 @Override 221 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 222 if (!id2Call.isEmpty()) { 223 cleanupCalls(IPCUtil.toIOE(cause)); 224 } 225 conn.shutdown(); 226 } 227 228 @Override 229 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 230 if (evt instanceof IdleStateEvent) { 231 IdleStateEvent idleEvt = (IdleStateEvent) evt; 232 switch (idleEvt.state()) { 233 case WRITER_IDLE: 234 if (id2Call.isEmpty()) { 235 if (LOG.isTraceEnabled()) { 236 LOG.trace("shutdown connection to " + conn.remoteId().address 237 + " because idle for a long time"); 238 } 239 // It may happen that there are still some pending calls in the event loop queue and 240 // they will get a closed channel exception. But this is not a big deal as it rarely 241 // rarely happens and the upper layer could retry immediately. 242 conn.shutdown(); 243 } 244 break; 245 default: 246 LOG.warn("Unrecognized idle state " + idleEvt.state()); 247 break; 248 } 249 } else if (evt instanceof CallEvent) { 250 // just remove the call for now until we add other call event other than timeout and cancel. 251 id2Call.remove(((CallEvent) evt).call.id); 252 } else { 253 ctx.fireUserEventTriggered(evt); 254 } 255 } 256}