001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.List;
022import org.apache.hadoop.hbase.DoNotRetryIOException;
023import org.apache.hadoop.hbase.client.VersionInfoUtil;
024import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
025import org.apache.yetus.audience.InterfaceAudience;
026
027import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
028import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
029import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
030import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
031import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException;
032
033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
035
036/**
037 * Decoder for extracting frame
038 * @since 2.0.0
039 */
040@InterfaceAudience.Private
041class NettyRpcFrameDecoder extends ByteToMessageDecoder {
042
043  private static int FRAME_LENGTH_FIELD_LENGTH = 4;
044
045  private final int maxFrameLength;
046  final NettyServerRpcConnection connection;
047
048  private boolean requestTooBig;
049  private boolean requestTooBigSent;
050  private String requestTooBigMessage;
051
052  public NettyRpcFrameDecoder(int maxFrameLength, NettyServerRpcConnection connection) {
053    this.maxFrameLength = maxFrameLength;
054    this.connection = connection;
055  }
056
057  @Override
058  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
059    if (requestTooBigSent) {
060      in.skipBytes(in.readableBytes());
061      return;
062    }
063    if (requestTooBig) {
064      handleTooBigRequest(ctx, in);
065      return;
066    }
067
068    if (in.readableBytes() < FRAME_LENGTH_FIELD_LENGTH) {
069      return;
070    }
071
072    long frameLength = in.getUnsignedInt(in.readerIndex());
073
074    if (frameLength < 0) {
075      throw new IOException("negative frame length field: " + frameLength);
076    }
077
078    if (frameLength > maxFrameLength) {
079      requestTooBig = true;
080      requestTooBigMessage =
081        "RPC data length of " + frameLength + " received from " + connection.getHostAddress()
082          + " is greater than max allowed " + connection.rpcServer.maxRequestSize + ". Set \""
083          + RpcServer.MAX_REQUEST_SIZE + "\" on server to override this limit (not recommended)";
084
085      NettyRpcServer.LOG.warn(requestTooBigMessage);
086
087      if (connection.connectionHeaderRead) {
088        handleTooBigRequest(ctx, in);
089        return;
090      }
091      ctx.channel().close();
092      return;
093    }
094
095    int frameLengthInt = (int) frameLength;
096    if (in.readableBytes() < frameLengthInt + FRAME_LENGTH_FIELD_LENGTH) {
097      return;
098    }
099
100    in.skipBytes(FRAME_LENGTH_FIELD_LENGTH);
101
102    // extract frame
103    out.add(in.readRetainedSlice(frameLengthInt));
104  }
105
106  private void handleTooBigRequest(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
107    in.skipBytes(FRAME_LENGTH_FIELD_LENGTH);
108    in.markReaderIndex();
109    int preIndex = in.readerIndex();
110    int headerSize = readRawVarint32(in);
111    if (preIndex == in.readerIndex()) {
112      return;
113    }
114    if (headerSize < 0) {
115      throw new IOException("negative headerSize: " + headerSize);
116    }
117
118    if (in.readableBytes() < headerSize) {
119      NettyRpcServer.LOG.debug("headerSize is larger than readableBytes");
120      in.resetReaderIndex();
121      return;
122    }
123
124    RPCProtos.RequestHeader header = getHeader(in, headerSize);
125    NettyRpcServer.LOG.info("BigRequest header is = " + header);
126
127    // Notify the client about the offending request
128    NettyServerCall reqTooBig = connection.createCall(header.getCallId(), connection.service, null,
129      null, null, null, 0, connection.addr, 0, null);
130
131    RequestTooBigException reqTooBigEx = new RequestTooBigException(requestTooBigMessage);
132    connection.rpcServer.metrics.exception(reqTooBigEx);
133
134    // Make sure the client recognizes the underlying exception
135    // Otherwise, throw a DoNotRetryIOException.
136    if (
137      VersionInfoUtil.hasMinimumVersion(connection.getVersionInfo(),
138        RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)
139    ) {
140      reqTooBig.setResponse(null, null, reqTooBigEx, requestTooBigMessage);
141    } else {
142      reqTooBig.setResponse(null, null, new DoNotRetryIOException(requestTooBigMessage),
143        requestTooBigMessage);
144    }
145
146    // To guarantee that the message is written and flushed before closing the channel,
147    // we should call channel.writeAndFlush() directly to add the close listener
148    // instead of calling reqTooBig.sendResponseIfReady()
149    reqTooBig.param = null;
150    connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE);
151    in.skipBytes(in.readableBytes());
152    requestTooBigSent = true;
153    // disable auto read as we do not care newer data from this channel any more
154    ctx.channel().config().setAutoRead(false);
155  }
156
157  private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException {
158    ByteBuf msg = in.readRetainedSlice(headerSize);
159    try {
160      byte[] array;
161      int offset;
162      int length = msg.readableBytes();
163      if (msg.hasArray()) {
164        array = msg.array();
165        offset = msg.arrayOffset() + msg.readerIndex();
166      } else {
167        array = new byte[length];
168        msg.getBytes(msg.readerIndex(), array, 0, length);
169        offset = 0;
170      }
171
172      RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder();
173      ProtobufUtil.mergeFrom(builder, array, offset, length);
174      return builder.build();
175    } finally {
176      msg.release();
177    }
178  }
179
180  /**
181   * Reads variable length 32bit int from buffer This method is from ProtobufVarint32FrameDecoder in
182   * Netty and modified a little bit to pass the cyeckstyle rule.
183   * @return decoded int if buffers readerIndex has been forwarded else nonsense value
184   */
185  private static int readRawVarint32(ByteBuf buffer) {
186    if (!buffer.isReadable()) {
187      return 0;
188    }
189    buffer.markReaderIndex();
190    byte tmp = buffer.readByte();
191    if (tmp >= 0) {
192      return tmp;
193    } else {
194      int result = tmp & 127;
195      if (!buffer.isReadable()) {
196        buffer.resetReaderIndex();
197        return 0;
198      }
199      tmp = buffer.readByte();
200      if (tmp >= 0) {
201        result |= tmp << 7;
202      } else {
203        result |= (tmp & 127) << 7;
204        if (!buffer.isReadable()) {
205          buffer.resetReaderIndex();
206          return 0;
207        }
208        tmp = buffer.readByte();
209        if (tmp >= 0) {
210          result |= tmp << 14;
211        } else {
212          result |= (tmp & 127) << 14;
213          if (!buffer.isReadable()) {
214            buffer.resetReaderIndex();
215            return 0;
216          }
217          tmp = buffer.readByte();
218          if (tmp >= 0) {
219            result |= tmp << 21;
220          } else {
221            result |= (tmp & 127) << 21;
222            if (!buffer.isReadable()) {
223              buffer.resetReaderIndex();
224              return 0;
225            }
226            tmp = buffer.readByte();
227            result |= tmp << 28;
228            if (tmp < 0) {
229              throw new CorruptedFrameException("malformed varint.");
230            }
231          }
232        }
233      }
234      return result;
235    }
236  }
237}