001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.List;
022import org.apache.hadoop.hbase.DoNotRetryIOException;
023import org.apache.hadoop.hbase.client.VersionInfoUtil;
024import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
025import org.apache.yetus.audience.InterfaceAudience;
026
027import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
028import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
029import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
030import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
031import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException;
032
033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
035
036/**
037 * Decoder for extracting frame
038 * @since 2.0.0
039 */
040@InterfaceAudience.Private
041public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
042
043  private static int FRAME_LENGTH_FIELD_LENGTH = 4;
044
045  private final int maxFrameLength;
046  private boolean requestTooBig;
047  private boolean requestTooBigSent;
048  private String requestTooBigMessage;
049
050  public NettyRpcFrameDecoder(int maxFrameLength) {
051    this.maxFrameLength = maxFrameLength;
052  }
053
054  NettyServerRpcConnection connection;
055
056  void setConnection(NettyServerRpcConnection connection) {
057    this.connection = connection;
058  }
059
060  @Override
061  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
062    if (requestTooBigSent) {
063      in.skipBytes(in.readableBytes());
064      return;
065    }
066    if (requestTooBig) {
067      handleTooBigRequest(ctx, in);
068      return;
069    }
070
071    if (in.readableBytes() < FRAME_LENGTH_FIELD_LENGTH) {
072      return;
073    }
074
075    long frameLength = in.getUnsignedInt(in.readerIndex());
076
077    if (frameLength < 0) {
078      throw new IOException("negative frame length field: " + frameLength);
079    }
080
081    if (frameLength > maxFrameLength) {
082      requestTooBig = true;
083      requestTooBigMessage = "RPC data length of " + frameLength + " received from "
084        + connection.getHostAddress() + " is greater than max allowed "
085        + connection.rpcServer.maxRequestSize + ". Set \"" + SimpleRpcServer.MAX_REQUEST_SIZE
086        + "\" on server to override this limit (not recommended)";
087
088      NettyRpcServer.LOG.warn(requestTooBigMessage);
089
090      if (connection.connectionHeaderRead) {
091        handleTooBigRequest(ctx, in);
092        return;
093      }
094      ctx.channel().close();
095      return;
096    }
097
098    int frameLengthInt = (int) frameLength;
099    if (in.readableBytes() < frameLengthInt + FRAME_LENGTH_FIELD_LENGTH) {
100      return;
101    }
102
103    in.skipBytes(FRAME_LENGTH_FIELD_LENGTH);
104
105    // extract frame
106    out.add(in.readRetainedSlice(frameLengthInt));
107  }
108
109  private void handleTooBigRequest(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
110    in.skipBytes(FRAME_LENGTH_FIELD_LENGTH);
111    in.markReaderIndex();
112    int preIndex = in.readerIndex();
113    int headerSize = readRawVarint32(in);
114    if (preIndex == in.readerIndex()) {
115      return;
116    }
117    if (headerSize < 0) {
118      throw new IOException("negative headerSize: " + headerSize);
119    }
120
121    if (in.readableBytes() < headerSize) {
122      NettyRpcServer.LOG.debug("headerSize is larger than readableBytes");
123      in.resetReaderIndex();
124      return;
125    }
126
127    RPCProtos.RequestHeader header = getHeader(in, headerSize);
128    NettyRpcServer.LOG.info("BigRequest header is = " + header);
129
130    // Notify the client about the offending request
131    NettyServerCall reqTooBig = connection.createCall(header.getCallId(), connection.service, null,
132      null, null, null, 0, connection.addr, 0, null);
133
134    RequestTooBigException reqTooBigEx = new RequestTooBigException(requestTooBigMessage);
135    connection.rpcServer.metrics.exception(reqTooBigEx);
136
137    // Make sure the client recognizes the underlying exception
138    // Otherwise, throw a DoNotRetryIOException.
139    if (
140      VersionInfoUtil.hasMinimumVersion(connection.connectionHeader.getVersionInfo(),
141        RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)
142    ) {
143      reqTooBig.setResponse(null, null, reqTooBigEx, requestTooBigMessage);
144    } else {
145      reqTooBig.setResponse(null, null, new DoNotRetryIOException(requestTooBigMessage),
146        requestTooBigMessage);
147    }
148
149    // To guarantee that the message is written and flushed before closing the channel,
150    // we should call channel.writeAndFlush() directly to add the close listener
151    // instead of calling reqTooBig.sendResponseIfReady()
152    reqTooBig.param = null;
153    connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE);
154    in.skipBytes(in.readableBytes());
155    requestTooBigSent = true;
156    // disable auto read as we do not care newer data from this channel any more
157    ctx.channel().config().setAutoRead(false);
158  }
159
160  private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException {
161    ByteBuf msg = in.readRetainedSlice(headerSize);
162    try {
163      byte[] array;
164      int offset;
165      int length = msg.readableBytes();
166      if (msg.hasArray()) {
167        array = msg.array();
168        offset = msg.arrayOffset() + msg.readerIndex();
169      } else {
170        array = new byte[length];
171        msg.getBytes(msg.readerIndex(), array, 0, length);
172        offset = 0;
173      }
174
175      RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder();
176      ProtobufUtil.mergeFrom(builder, array, offset, length);
177      return builder.build();
178    } finally {
179      msg.release();
180    }
181  }
182
183  /**
184   * Reads variable length 32bit int from buffer This method is from ProtobufVarint32FrameDecoder in
185   * Netty and modified a little bit to pass the cyeckstyle rule.
186   * @return decoded int if buffers readerIndex has been forwarded else nonsense value
187   */
188  private static int readRawVarint32(ByteBuf buffer) {
189    if (!buffer.isReadable()) {
190      return 0;
191    }
192    buffer.markReaderIndex();
193    byte tmp = buffer.readByte();
194    if (tmp >= 0) {
195      return tmp;
196    } else {
197      int result = tmp & 127;
198      if (!buffer.isReadable()) {
199        buffer.resetReaderIndex();
200        return 0;
201      }
202      tmp = buffer.readByte();
203      if (tmp >= 0) {
204        result |= tmp << 7;
205      } else {
206        result |= (tmp & 127) << 7;
207        if (!buffer.isReadable()) {
208          buffer.resetReaderIndex();
209          return 0;
210        }
211        tmp = buffer.readByte();
212        if (tmp >= 0) {
213          result |= tmp << 14;
214        } else {
215          result |= (tmp & 127) << 14;
216          if (!buffer.isReadable()) {
217            buffer.resetReaderIndex();
218            return 0;
219          }
220          tmp = buffer.readByte();
221          if (tmp >= 0) {
222            result |= tmp << 21;
223          } else {
224            result |= (tmp & 127) << 21;
225            if (!buffer.isReadable()) {
226              buffer.resetReaderIndex();
227              return 0;
228            }
229            tmp = buffer.readByte();
230            result |= tmp << 28;
231            if (tmp < 0) {
232              throw new CorruptedFrameException("malformed varint.");
233            }
234          }
235        }
236      }
237      return result;
238    }
239  }
240}