001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.List; 022import org.apache.hadoop.hbase.DoNotRetryIOException; 023import org.apache.hadoop.hbase.client.VersionInfoUtil; 024import org.apache.hadoop.hbase.exceptions.RequestTooBigException; 025import org.apache.yetus.audience.InterfaceAudience; 026 027import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 028import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 029import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 030import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder; 031import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException; 032 033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 035 036/** 037 * Decoder for extracting frame 038 * @since 2.0.0 039 */ 040@InterfaceAudience.Private 041class NettyRpcFrameDecoder extends ByteToMessageDecoder { 042 043 private static int FRAME_LENGTH_FIELD_LENGTH = 4; 044 045 private final int maxFrameLength; 046 final NettyServerRpcConnection connection; 047 048 private boolean requestTooBig; 049 private boolean requestTooBigSent; 050 private String requestTooBigMessage; 051 052 public NettyRpcFrameDecoder(int maxFrameLength, NettyServerRpcConnection connection) { 053 this.maxFrameLength = maxFrameLength; 054 this.connection = connection; 055 } 056 057 @Override 058 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 059 if (requestTooBigSent) { 060 in.skipBytes(in.readableBytes()); 061 return; 062 } 063 if (requestTooBig) { 064 handleTooBigRequest(ctx, in); 065 return; 066 } 067 068 if (in.readableBytes() < FRAME_LENGTH_FIELD_LENGTH) { 069 return; 070 } 071 072 long frameLength = in.getUnsignedInt(in.readerIndex()); 073 074 if (frameLength < 0) { 075 throw new IOException("negative frame length field: " + frameLength); 076 } 077 078 if (frameLength > maxFrameLength) { 079 requestTooBig = true; 080 requestTooBigMessage = 081 "RPC data length of " + frameLength + " received from " + connection.getHostAddress() 082 + " is greater than max allowed " + connection.rpcServer.maxRequestSize + ". Set \"" 083 + RpcServer.MAX_REQUEST_SIZE + "\" on server to override this limit (not recommended)"; 084 085 NettyRpcServer.LOG.warn(requestTooBigMessage); 086 087 if (connection.connectionHeaderRead) { 088 handleTooBigRequest(ctx, in); 089 return; 090 } 091 ctx.channel().close(); 092 return; 093 } 094 095 int frameLengthInt = (int) frameLength; 096 if (in.readableBytes() < frameLengthInt + FRAME_LENGTH_FIELD_LENGTH) { 097 return; 098 } 099 100 in.skipBytes(FRAME_LENGTH_FIELD_LENGTH); 101 102 // extract frame 103 out.add(in.readRetainedSlice(frameLengthInt)); 104 } 105 106 private void handleTooBigRequest(ChannelHandlerContext ctx, ByteBuf in) throws IOException { 107 in.skipBytes(FRAME_LENGTH_FIELD_LENGTH); 108 in.markReaderIndex(); 109 int preIndex = in.readerIndex(); 110 int headerSize = readRawVarint32(in); 111 if (preIndex == in.readerIndex()) { 112 return; 113 } 114 if (headerSize < 0) { 115 throw new IOException("negative headerSize: " + headerSize); 116 } 117 118 if (in.readableBytes() < headerSize) { 119 NettyRpcServer.LOG.debug("headerSize is larger than readableBytes"); 120 in.resetReaderIndex(); 121 return; 122 } 123 124 RPCProtos.RequestHeader header = getHeader(in, headerSize); 125 NettyRpcServer.LOG.info("BigRequest header is = " + header); 126 127 // Notify the client about the offending request 128 NettyServerCall reqTooBig = connection.createCall(header.getCallId(), connection.service, null, 129 null, null, null, 0, connection.addr, 0, null); 130 131 RequestTooBigException reqTooBigEx = new RequestTooBigException(requestTooBigMessage); 132 connection.rpcServer.metrics.exception(reqTooBigEx); 133 134 // Make sure the client recognizes the underlying exception 135 // Otherwise, throw a DoNotRetryIOException. 136 if ( 137 VersionInfoUtil.hasMinimumVersion(connection.getVersionInfo(), 138 RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION) 139 ) { 140 reqTooBig.setResponse(null, null, reqTooBigEx, requestTooBigMessage); 141 } else { 142 reqTooBig.setResponse(null, null, new DoNotRetryIOException(requestTooBigMessage), 143 requestTooBigMessage); 144 } 145 146 // To guarantee that the message is written and flushed before closing the channel, 147 // we should call channel.writeAndFlush() directly to add the close listener 148 // instead of calling reqTooBig.sendResponseIfReady() 149 reqTooBig.param = null; 150 connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE); 151 in.skipBytes(in.readableBytes()); 152 requestTooBigSent = true; 153 // disable auto read as we do not care newer data from this channel any more 154 ctx.channel().config().setAutoRead(false); 155 } 156 157 private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException { 158 ByteBuf msg = in.readRetainedSlice(headerSize); 159 try { 160 byte[] array; 161 int offset; 162 int length = msg.readableBytes(); 163 if (msg.hasArray()) { 164 array = msg.array(); 165 offset = msg.arrayOffset() + msg.readerIndex(); 166 } else { 167 array = new byte[length]; 168 msg.getBytes(msg.readerIndex(), array, 0, length); 169 offset = 0; 170 } 171 172 RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder(); 173 ProtobufUtil.mergeFrom(builder, array, offset, length); 174 return builder.build(); 175 } finally { 176 msg.release(); 177 } 178 } 179 180 /** 181 * Reads variable length 32bit int from buffer This method is from ProtobufVarint32FrameDecoder in 182 * Netty and modified a little bit to pass the cyeckstyle rule. 183 * @return decoded int if buffers readerIndex has been forwarded else nonsense value 184 */ 185 private static int readRawVarint32(ByteBuf buffer) { 186 if (!buffer.isReadable()) { 187 return 0; 188 } 189 buffer.markReaderIndex(); 190 byte tmp = buffer.readByte(); 191 if (tmp >= 0) { 192 return tmp; 193 } else { 194 int result = tmp & 127; 195 if (!buffer.isReadable()) { 196 buffer.resetReaderIndex(); 197 return 0; 198 } 199 tmp = buffer.readByte(); 200 if (tmp >= 0) { 201 result |= tmp << 7; 202 } else { 203 result |= (tmp & 127) << 7; 204 if (!buffer.isReadable()) { 205 buffer.resetReaderIndex(); 206 return 0; 207 } 208 tmp = buffer.readByte(); 209 if (tmp >= 0) { 210 result |= tmp << 14; 211 } else { 212 result |= (tmp & 127) << 14; 213 if (!buffer.isReadable()) { 214 buffer.resetReaderIndex(); 215 return 0; 216 } 217 tmp = buffer.readByte(); 218 if (tmp >= 0) { 219 result |= tmp << 21; 220 } else { 221 result |= (tmp & 127) << 21; 222 if (!buffer.isReadable()) { 223 buffer.resetReaderIndex(); 224 return 0; 225 } 226 tmp = buffer.readByte(); 227 result |= tmp << 28; 228 if (tmp < 0) { 229 throw new CorruptedFrameException("malformed varint."); 230 } 231 } 232 } 233 } 234 return result; 235 } 236 } 237}