001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.util.HashMap;
021import java.util.concurrent.TimeUnit;
022import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
023import org.apache.hadoop.hbase.util.NettyFutureUtils;
024import org.apache.yetus.audience.InterfaceAudience;
025
026import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
027import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
028import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
029import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
030import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
031import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
032import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
033import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
034
035/**
036 * Used to decode preamble calls.
037 */
038@InterfaceAudience.Private
039class PreambleCallHandler extends SimpleChannelInboundHandler<ByteBuf> {
040
041  private final NettyRpcConnection conn;
042
043  private final byte[] preambleHeader;
044
045  private final Call preambleCall;
046
047  PreambleCallHandler(NettyRpcConnection conn, byte[] preambleHeader, Call preambleCall) {
048    this.conn = conn;
049    this.preambleHeader = preambleHeader;
050    this.preambleCall = preambleCall;
051  }
052
053  @Override
054  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
055    NettyFutureUtils.safeWriteAndFlush(ctx,
056      Unpooled.directBuffer(preambleHeader.length).writeBytes(preambleHeader));
057  }
058
059  @Override
060  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
061    try {
062      conn.readResponse(new ByteBufInputStream(buf), new HashMap<>(), preambleCall,
063        remoteExc -> exceptionCaught(ctx, remoteExc));
064    } finally {
065      ChannelPipeline p = ctx.pipeline();
066      p.remove("PreambleCallReadTimeoutHandler");
067      p.remove("PreambleCallFrameDecoder");
068      p.remove(this);
069    }
070  }
071
072  @Override
073  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
074    preambleCall.setException(new ConnectionClosedException("Connection closed"));
075    ctx.fireChannelInactive();
076  }
077
078  @Override
079  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
080    preambleCall.setException(IPCUtil.toIOE(cause));
081  }
082
083  public static void setup(ChannelPipeline pipeline, int readTimeoutMs, NettyRpcConnection conn,
084    byte[] preambleHeader, Call preambleCall) {
085    // we do not use single decode here, as for a preamble call, we do not expect the server side
086    // will return multiple responses
087    pipeline
088      .addBefore(BufferCallBeforeInitHandler.NAME, "PreambleCallReadTimeoutHandler",
089        new ReadTimeoutHandler(readTimeoutMs, TimeUnit.MILLISECONDS))
090      .addBefore(BufferCallBeforeInitHandler.NAME, "PreambleCallFrameDecoder",
091        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
092      .addBefore(BufferCallBeforeInitHandler.NAME, "PreambleCallHandler",
093        new PreambleCallHandler(conn, preambleHeader, preambleCall));
094  }
095}