001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * <p>
010 * http://www.apache.org/licenses/LICENSE-2.0
011 * <p>
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.List;
022
023import org.apache.hadoop.hbase.DoNotRetryIOException;
024import org.apache.hadoop.hbase.client.VersionInfoUtil;
025import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
026import org.apache.yetus.audience.InterfaceAudience;
027
028import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
029import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
030import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
031import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
032import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException;
033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
035
036
037/**
038 * Decoder for extracting frame
039 *
040 * @since 2.0.0
041 */
042@InterfaceAudience.Private
043public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
044
045  private static int FRAME_LENGTH_FIELD_LENGTH = 4;
046
047  private final int maxFrameLength;
048  private boolean requestTooBig;
049  private String requestTooBigMessage;
050
051  public NettyRpcFrameDecoder(int maxFrameLength) {
052    this.maxFrameLength = maxFrameLength;
053  }
054
055  private NettyServerRpcConnection connection;
056
057  void setConnection(NettyServerRpcConnection connection) {
058    this.connection = connection;
059  }
060
061  @Override
062  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
063    throws Exception {
064    if (requestTooBig) {
065      handleTooBigRequest(in);
066      return;
067    }
068
069    if (in.readableBytes() < FRAME_LENGTH_FIELD_LENGTH) {
070      return;
071    }
072
073    long frameLength = in.getUnsignedInt(in.readerIndex());
074
075    if (frameLength < 0) {
076      throw new IOException("negative frame length field: " + frameLength);
077    }
078
079    if (frameLength > maxFrameLength) {
080      requestTooBig = true;
081      requestTooBigMessage =
082        "RPC data length of " + frameLength + " received from " + connection.getHostAddress()
083          + " is greater than max allowed " + connection.rpcServer.maxRequestSize + ". Set \""
084          + SimpleRpcServer.MAX_REQUEST_SIZE
085          + "\" on server to override this limit (not recommended)";
086
087      NettyRpcServer.LOG.warn(requestTooBigMessage);
088
089      if (connection.connectionHeaderRead) {
090        handleTooBigRequest(in);
091        return;
092      }
093      ctx.channel().close();
094      return;
095    }
096
097    int frameLengthInt = (int) frameLength;
098    if (in.readableBytes() < frameLengthInt + FRAME_LENGTH_FIELD_LENGTH) {
099      return;
100    }
101
102    in.skipBytes(FRAME_LENGTH_FIELD_LENGTH);
103
104    // extract frame
105    out.add(in.readRetainedSlice(frameLengthInt));
106  }
107
108  private void handleTooBigRequest(ByteBuf in) throws IOException {
109    in.markReaderIndex();
110    int preIndex = in.readerIndex();
111    int headerSize = readRawVarint32(in);
112    if (preIndex == in.readerIndex()) {
113      return;
114    }
115    if (headerSize < 0) {
116      throw new IOException("negative headerSize: " + headerSize);
117    }
118
119    if (in.readableBytes() < headerSize) {
120      in.resetReaderIndex();
121      return;
122    }
123
124    RPCProtos.RequestHeader header = getHeader(in, headerSize);
125
126    // Notify the client about the offending request
127    NettyServerCall reqTooBig =
128      new NettyServerCall(header.getCallId(), connection.service, null, null, null, null,
129        connection, 0, connection.addr, System.currentTimeMillis(), 0,
130        connection.rpcServer.reservoir, connection.rpcServer.cellBlockBuilder, null);
131
132    connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION);
133
134    // Make sure the client recognizes the underlying exception
135    // Otherwise, throw a DoNotRetryIOException.
136    if (VersionInfoUtil.hasMinimumVersion(connection.connectionHeader.getVersionInfo(),
137      RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) {
138      reqTooBig.setResponse(null, null,
139        SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, requestTooBigMessage);
140    } else {
141      reqTooBig.setResponse(null, null, new DoNotRetryIOException(), requestTooBigMessage);
142    }
143
144    // To guarantee that the message is written and flushed before closing the channel,
145    // we should call channel.writeAndFlush() directly to add the close listener
146    // instead of calling reqTooBig.sendResponseIfReady()
147    reqTooBig.param = null;
148    connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE);
149  }
150
151  private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException {
152    ByteBuf msg = in.readRetainedSlice(headerSize);
153    try {
154      byte[] array;
155      int offset;
156      int length = msg.readableBytes();
157      if (msg.hasArray()) {
158        array = msg.array();
159        offset = msg.arrayOffset() + msg.readerIndex();
160      } else {
161        array = new byte[length];
162        msg.getBytes(msg.readerIndex(), array, 0, length);
163        offset = 0;
164      }
165
166      RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder();
167      ProtobufUtil.mergeFrom(builder, array, offset, length);
168      return builder.build();
169    } finally {
170      msg.release();
171    }
172  }
173
174  /**
175   * Reads variable length 32bit int from buffer
176   * This method is from ProtobufVarint32FrameDecoder in Netty and modified a little bit
177   * to pass the cyeckstyle rule.
178   *
179   * @return decoded int if buffers readerIndex has been forwarded else nonsense value
180   */
181  private static int readRawVarint32(ByteBuf buffer) {
182    if (!buffer.isReadable()) {
183      return 0;
184    }
185    buffer.markReaderIndex();
186    byte tmp = buffer.readByte();
187    if (tmp >= 0) {
188      return tmp;
189    } else {
190      int result = tmp & 127;
191      if (!buffer.isReadable()) {
192        buffer.resetReaderIndex();
193        return 0;
194      }
195      tmp = buffer.readByte();
196      if (tmp >= 0) {
197        result |= tmp << 7;
198      } else {
199        result |= (tmp & 127) << 7;
200        if (!buffer.isReadable()) {
201          buffer.resetReaderIndex();
202          return 0;
203        }
204        tmp = buffer.readByte();
205        if (tmp >= 0) {
206          result |= tmp << 14;
207        } else {
208          result |= (tmp & 127) << 14;
209          if (!buffer.isReadable()) {
210            buffer.resetReaderIndex();
211            return 0;
212          }
213          tmp = buffer.readByte();
214          if (tmp >= 0) {
215            result |= tmp << 21;
216          } else {
217            result |= (tmp & 127) << 21;
218            if (!buffer.isReadable()) {
219              buffer.resetReaderIndex();
220              return 0;
221            }
222            tmp = buffer.readByte();
223            result |= tmp << 28;
224            if (tmp < 0) {
225              throw new CorruptedFrameException("malformed varint.");
226            }
227          }
228        }
229      }
230      return result;
231    }
232  }
233}