001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import io.opentelemetry.context.Scope;
021import java.io.IOException;
022import java.util.HashMap;
023import java.util.Map;
024import org.apache.hadoop.hbase.CellScanner;
025import org.apache.hadoop.hbase.codec.Codec;
026import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
027import org.apache.hadoop.io.compress.CompressionCodec;
028import org.apache.hadoop.ipc.RemoteException;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import org.apache.hbase.thirdparty.com.google.protobuf.Message;
034import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
035import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
036import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
037import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
038import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
039import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
040import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
041import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
042import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
043import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
049
050/**
051 * The netty rpc handler.
052 * @since 2.0.0
053 */
054@InterfaceAudience.Private
055class NettyRpcDuplexHandler extends ChannelDuplexHandler {
056
057  private static final Logger LOG = LoggerFactory.getLogger(NettyRpcDuplexHandler.class);
058
059  private final NettyRpcConnection conn;
060
061  private final CellBlockBuilder cellBlockBuilder;
062
063  private final Codec codec;
064
065  private final CompressionCodec compressor;
066
067  private final Map<Integer, Call> id2Call = new HashMap<>();
068
069  public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder cellBlockBuilder,
070    Codec codec, CompressionCodec compressor) {
071    this.conn = conn;
072    this.cellBlockBuilder = cellBlockBuilder;
073    this.codec = codec;
074    this.compressor = compressor;
075
076  }
077
078  private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise promise)
079    throws IOException {
080    id2Call.put(call.id, call);
081    ByteBuf cellBlock = cellBlockBuilder.buildCellBlock(codec, compressor, call.cells, ctx.alloc());
082    CellBlockMeta cellBlockMeta;
083    if (cellBlock != null) {
084      CellBlockMeta.Builder cellBlockMetaBuilder = CellBlockMeta.newBuilder();
085      cellBlockMetaBuilder.setLength(cellBlock.writerIndex());
086      cellBlockMeta = cellBlockMetaBuilder.build();
087    } else {
088      cellBlockMeta = null;
089    }
090    RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, cellBlockMeta);
091    int sizeWithoutCellBlock = IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param);
092    int totalSize =
093      cellBlock != null ? sizeWithoutCellBlock + cellBlock.writerIndex() : sizeWithoutCellBlock;
094    ByteBuf buf = ctx.alloc().buffer(sizeWithoutCellBlock + 4);
095    buf.writeInt(totalSize);
096    try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf)) {
097      requestHeader.writeDelimitedTo(bbos);
098      if (call.param != null) {
099        call.param.writeDelimitedTo(bbos);
100      }
101      if (cellBlock != null) {
102        ChannelPromise withoutCellBlockPromise = ctx.newPromise();
103        ctx.write(buf, withoutCellBlockPromise);
104        ChannelPromise cellBlockPromise = ctx.newPromise();
105        ctx.write(cellBlock, cellBlockPromise);
106        PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
107        combiner.addAll((ChannelFuture) withoutCellBlockPromise, cellBlockPromise);
108        combiner.finish(promise);
109      } else {
110        ctx.write(buf, promise);
111      }
112    }
113  }
114
115  @Override
116  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
117    throws Exception {
118    if (msg instanceof Call) {
119      Call call = (Call) msg;
120      try (Scope scope = call.span.makeCurrent()) {
121        writeRequest(ctx, call, promise);
122      }
123    } else {
124      ctx.write(msg, promise);
125    }
126  }
127
128  private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException {
129    int totalSize = buf.readInt();
130    ByteBufInputStream in = new ByteBufInputStream(buf);
131    ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
132    int id = responseHeader.getCallId();
133    if (LOG.isTraceEnabled()) {
134      LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader)
135        + ", totalSize: " + totalSize + " bytes");
136    }
137    RemoteException remoteExc;
138    if (responseHeader.hasException()) {
139      ExceptionResponse exceptionResponse = responseHeader.getException();
140      remoteExc = IPCUtil.createRemoteException(exceptionResponse);
141      if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
142        // Here we will cleanup all calls so do not need to fall back, just return.
143        exceptionCaught(ctx, remoteExc);
144        return;
145      }
146    } else {
147      remoteExc = null;
148    }
149    Call call = id2Call.remove(id);
150    if (call == null) {
151      // So we got a response for which we have no corresponding 'call' here on the client-side.
152      // We probably timed out waiting, cleaned up all references, and now the server decides
153      // to return a response. There is nothing we can do w/ the response at this stage. Clean
154      // out the wire of the response so its out of the way and we can get other responses on
155      // this connection.
156      if (LOG.isDebugEnabled()) {
157        int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
158        int whatIsLeftToRead = totalSize - readSoFar;
159        LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead
160          + " bytes");
161      }
162      return;
163    }
164    if (remoteExc != null) {
165      call.setException(remoteExc);
166      return;
167    }
168    Message value;
169    if (call.responseDefaultType != null) {
170      Message.Builder builder = call.responseDefaultType.newBuilderForType();
171      builder.mergeDelimitedFrom(in);
172      value = builder.build();
173    } else {
174      value = null;
175    }
176    CellScanner cellBlockScanner;
177    if (responseHeader.hasCellBlockMeta()) {
178      int size = responseHeader.getCellBlockMeta().getLength();
179      // Maybe we could read directly from the ByteBuf.
180      // The problem here is that we do not know when to release it.
181      byte[] cellBlock = new byte[size];
182      buf.readBytes(cellBlock);
183      cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
184    } else {
185      cellBlockScanner = null;
186    }
187    call.setResponse(value, cellBlockScanner);
188  }
189
190  @Override
191  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
192    if (msg instanceof ByteBuf) {
193      ByteBuf buf = (ByteBuf) msg;
194      try {
195        readResponse(ctx, buf);
196      } finally {
197        buf.release();
198      }
199    } else {
200      super.channelRead(ctx, msg);
201    }
202  }
203
204  private void cleanupCalls(IOException error) {
205    for (Call call : id2Call.values()) {
206      call.setException(error);
207    }
208    id2Call.clear();
209  }
210
211  @Override
212  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
213    if (!id2Call.isEmpty()) {
214      cleanupCalls(new ConnectionClosedException("Connection closed"));
215    }
216    conn.shutdown();
217    ctx.fireChannelInactive();
218  }
219
220  @Override
221  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
222    if (!id2Call.isEmpty()) {
223      cleanupCalls(IPCUtil.toIOE(cause));
224    }
225    conn.shutdown();
226  }
227
228  @Override
229  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
230    if (evt instanceof IdleStateEvent) {
231      IdleStateEvent idleEvt = (IdleStateEvent) evt;
232      switch (idleEvt.state()) {
233        case WRITER_IDLE:
234          if (id2Call.isEmpty()) {
235            if (LOG.isTraceEnabled()) {
236              LOG.trace("shutdown connection to " + conn.remoteId().address
237                + " because idle for a long time");
238            }
239            // It may happen that there are still some pending calls in the event loop queue and
240            // they will get a closed channel exception. But this is not a big deal as it rarely
241            // rarely happens and the upper layer could retry immediately.
242            conn.shutdown();
243          }
244          break;
245        default:
246          LOG.warn("Unrecognized idle state " + idleEvt.state());
247          break;
248      }
249    } else if (evt instanceof CallEvent) {
250      // just remove the call for now until we add other call event other than timeout and cancel.
251      id2Call.remove(((CallEvent) evt).call.id);
252    } else {
253      ctx.fireUserEventTriggered(evt);
254    }
255  }
256}