001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.List; 022import org.apache.hadoop.hbase.DoNotRetryIOException; 023import org.apache.hadoop.hbase.client.VersionInfoUtil; 024import org.apache.hadoop.hbase.exceptions.RequestTooBigException; 025import org.apache.yetus.audience.InterfaceAudience; 026 027import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 028import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 029import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 030import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder; 031import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException; 032 033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 035 036/** 037 * Decoder for extracting frame 038 * @since 2.0.0 039 */ 040@InterfaceAudience.Private 041public class NettyRpcFrameDecoder extends ByteToMessageDecoder { 042 043 private static int FRAME_LENGTH_FIELD_LENGTH = 4; 044 045 private final int maxFrameLength; 046 private boolean requestTooBig; 047 private boolean requestTooBigSent; 048 private String requestTooBigMessage; 049 050 public NettyRpcFrameDecoder(int maxFrameLength) { 051 this.maxFrameLength = maxFrameLength; 052 } 053 054 NettyServerRpcConnection connection; 055 056 void setConnection(NettyServerRpcConnection connection) { 057 this.connection = connection; 058 } 059 060 @Override 061 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 062 if (requestTooBigSent) { 063 in.skipBytes(in.readableBytes()); 064 return; 065 } 066 if (requestTooBig) { 067 handleTooBigRequest(ctx, in); 068 return; 069 } 070 071 if (in.readableBytes() < FRAME_LENGTH_FIELD_LENGTH) { 072 return; 073 } 074 075 long frameLength = in.getUnsignedInt(in.readerIndex()); 076 077 if (frameLength < 0) { 078 throw new IOException("negative frame length field: " + frameLength); 079 } 080 081 if (frameLength > maxFrameLength) { 082 requestTooBig = true; 083 requestTooBigMessage = "RPC data length of " + frameLength + " received from " 084 + connection.getHostAddress() + " is greater than max allowed " 085 + connection.rpcServer.maxRequestSize + ". Set \"" + SimpleRpcServer.MAX_REQUEST_SIZE 086 + "\" on server to override this limit (not recommended)"; 087 088 NettyRpcServer.LOG.warn(requestTooBigMessage); 089 090 if (connection.connectionHeaderRead) { 091 handleTooBigRequest(ctx, in); 092 return; 093 } 094 ctx.channel().close(); 095 return; 096 } 097 098 int frameLengthInt = (int) frameLength; 099 if (in.readableBytes() < frameLengthInt + FRAME_LENGTH_FIELD_LENGTH) { 100 return; 101 } 102 103 in.skipBytes(FRAME_LENGTH_FIELD_LENGTH); 104 105 // extract frame 106 out.add(in.readRetainedSlice(frameLengthInt)); 107 } 108 109 private void handleTooBigRequest(ChannelHandlerContext ctx, ByteBuf in) throws IOException { 110 in.skipBytes(FRAME_LENGTH_FIELD_LENGTH); 111 in.markReaderIndex(); 112 int preIndex = in.readerIndex(); 113 int headerSize = readRawVarint32(in); 114 if (preIndex == in.readerIndex()) { 115 return; 116 } 117 if (headerSize < 0) { 118 throw new IOException("negative headerSize: " + headerSize); 119 } 120 121 if (in.readableBytes() < headerSize) { 122 NettyRpcServer.LOG.debug("headerSize is larger than readableBytes"); 123 in.resetReaderIndex(); 124 return; 125 } 126 127 RPCProtos.RequestHeader header = getHeader(in, headerSize); 128 NettyRpcServer.LOG.info("BigRequest header is = " + header); 129 130 // Notify the client about the offending request 131 NettyServerCall reqTooBig = connection.createCall(header.getCallId(), connection.service, null, 132 null, null, null, 0, connection.addr, 0, null); 133 134 RequestTooBigException reqTooBigEx = new RequestTooBigException(requestTooBigMessage); 135 connection.rpcServer.metrics.exception(reqTooBigEx); 136 137 // Make sure the client recognizes the underlying exception 138 // Otherwise, throw a DoNotRetryIOException. 139 if ( 140 VersionInfoUtil.hasMinimumVersion(connection.connectionHeader.getVersionInfo(), 141 RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION) 142 ) { 143 reqTooBig.setResponse(null, null, reqTooBigEx, requestTooBigMessage); 144 } else { 145 reqTooBig.setResponse(null, null, new DoNotRetryIOException(requestTooBigMessage), 146 requestTooBigMessage); 147 } 148 149 // To guarantee that the message is written and flushed before closing the channel, 150 // we should call channel.writeAndFlush() directly to add the close listener 151 // instead of calling reqTooBig.sendResponseIfReady() 152 reqTooBig.param = null; 153 connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE); 154 in.skipBytes(in.readableBytes()); 155 requestTooBigSent = true; 156 // disable auto read as we do not care newer data from this channel any more 157 ctx.channel().config().setAutoRead(false); 158 } 159 160 private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException { 161 ByteBuf msg = in.readRetainedSlice(headerSize); 162 try { 163 byte[] array; 164 int offset; 165 int length = msg.readableBytes(); 166 if (msg.hasArray()) { 167 array = msg.array(); 168 offset = msg.arrayOffset() + msg.readerIndex(); 169 } else { 170 array = new byte[length]; 171 msg.getBytes(msg.readerIndex(), array, 0, length); 172 offset = 0; 173 } 174 175 RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder(); 176 ProtobufUtil.mergeFrom(builder, array, offset, length); 177 return builder.build(); 178 } finally { 179 msg.release(); 180 } 181 } 182 183 /** 184 * Reads variable length 32bit int from buffer This method is from ProtobufVarint32FrameDecoder in 185 * Netty and modified a little bit to pass the cyeckstyle rule. 186 * @return decoded int if buffers readerIndex has been forwarded else nonsense value 187 */ 188 private static int readRawVarint32(ByteBuf buffer) { 189 if (!buffer.isReadable()) { 190 return 0; 191 } 192 buffer.markReaderIndex(); 193 byte tmp = buffer.readByte(); 194 if (tmp >= 0) { 195 return tmp; 196 } else { 197 int result = tmp & 127; 198 if (!buffer.isReadable()) { 199 buffer.resetReaderIndex(); 200 return 0; 201 } 202 tmp = buffer.readByte(); 203 if (tmp >= 0) { 204 result |= tmp << 7; 205 } else { 206 result |= (tmp & 127) << 7; 207 if (!buffer.isReadable()) { 208 buffer.resetReaderIndex(); 209 return 0; 210 } 211 tmp = buffer.readByte(); 212 if (tmp >= 0) { 213 result |= tmp << 14; 214 } else { 215 result |= (tmp & 127) << 14; 216 if (!buffer.isReadable()) { 217 buffer.resetReaderIndex(); 218 return 0; 219 } 220 tmp = buffer.readByte(); 221 if (tmp >= 0) { 222 result |= tmp << 21; 223 } else { 224 result |= (tmp & 127) << 21; 225 if (!buffer.isReadable()) { 226 buffer.resetReaderIndex(); 227 return 0; 228 } 229 tmp = buffer.readByte(); 230 result |= tmp << 28; 231 if (tmp < 0) { 232 throw new CorruptedFrameException("malformed varint."); 233 } 234 } 235 } 236 } 237 return result; 238 } 239 } 240}