001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * <p> 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * <p> 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.util.List; 022 023import org.apache.hadoop.hbase.DoNotRetryIOException; 024import org.apache.hadoop.hbase.client.VersionInfoUtil; 025import org.apache.hadoop.hbase.exceptions.RequestTooBigException; 026import org.apache.yetus.audience.InterfaceAudience; 027 028import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 029import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 030import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 031import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder; 032import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException; 033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 035 036 037/** 038 * Decoder for extracting frame 039 * 040 * @since 2.0.0 041 */ 042@InterfaceAudience.Private 043public class NettyRpcFrameDecoder extends ByteToMessageDecoder { 044 045 private static int FRAME_LENGTH_FIELD_LENGTH = 4; 046 047 private final int maxFrameLength; 048 private boolean requestTooBig; 049 private String requestTooBigMessage; 050 051 public NettyRpcFrameDecoder(int maxFrameLength) { 052 this.maxFrameLength = maxFrameLength; 053 } 054 055 private NettyServerRpcConnection connection; 056 057 void setConnection(NettyServerRpcConnection connection) { 058 this.connection = connection; 059 } 060 061 @Override 062 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) 063 throws Exception { 064 if (requestTooBig) { 065 handleTooBigRequest(in); 066 return; 067 } 068 069 if (in.readableBytes() < FRAME_LENGTH_FIELD_LENGTH) { 070 return; 071 } 072 073 long frameLength = in.getUnsignedInt(in.readerIndex()); 074 075 if (frameLength < 0) { 076 throw new IOException("negative frame length field: " + frameLength); 077 } 078 079 if (frameLength > maxFrameLength) { 080 requestTooBig = true; 081 requestTooBigMessage = 082 "RPC data length of " + frameLength + " received from " + connection.getHostAddress() 083 + " is greater than max allowed " + connection.rpcServer.maxRequestSize + ". Set \"" 084 + SimpleRpcServer.MAX_REQUEST_SIZE 085 + "\" on server to override this limit (not recommended)"; 086 087 NettyRpcServer.LOG.warn(requestTooBigMessage); 088 089 if (connection.connectionHeaderRead) { 090 handleTooBigRequest(in); 091 return; 092 } 093 ctx.channel().close(); 094 return; 095 } 096 097 int frameLengthInt = (int) frameLength; 098 if (in.readableBytes() < frameLengthInt + FRAME_LENGTH_FIELD_LENGTH) { 099 return; 100 } 101 102 in.skipBytes(FRAME_LENGTH_FIELD_LENGTH); 103 104 // extract frame 105 out.add(in.readRetainedSlice(frameLengthInt)); 106 } 107 108 private void handleTooBigRequest(ByteBuf in) throws IOException { 109 in.markReaderIndex(); 110 int preIndex = in.readerIndex(); 111 int headerSize = readRawVarint32(in); 112 if (preIndex == in.readerIndex()) { 113 return; 114 } 115 if (headerSize < 0) { 116 throw new IOException("negative headerSize: " + headerSize); 117 } 118 119 if (in.readableBytes() < headerSize) { 120 in.resetReaderIndex(); 121 return; 122 } 123 124 RPCProtos.RequestHeader header = getHeader(in, headerSize); 125 126 // Notify the client about the offending request 127 NettyServerCall reqTooBig = 128 new NettyServerCall(header.getCallId(), connection.service, null, null, null, null, 129 connection, 0, connection.addr, System.currentTimeMillis(), 0, 130 connection.rpcServer.reservoir, connection.rpcServer.cellBlockBuilder, null); 131 132 connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION); 133 134 // Make sure the client recognizes the underlying exception 135 // Otherwise, throw a DoNotRetryIOException. 136 if (VersionInfoUtil.hasMinimumVersion(connection.connectionHeader.getVersionInfo(), 137 RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) { 138 reqTooBig.setResponse(null, null, 139 SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, requestTooBigMessage); 140 } else { 141 reqTooBig.setResponse(null, null, new DoNotRetryIOException(), requestTooBigMessage); 142 } 143 144 // To guarantee that the message is written and flushed before closing the channel, 145 // we should call channel.writeAndFlush() directly to add the close listener 146 // instead of calling reqTooBig.sendResponseIfReady() 147 reqTooBig.param = null; 148 connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE); 149 } 150 151 private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException { 152 ByteBuf msg = in.readRetainedSlice(headerSize); 153 try { 154 byte[] array; 155 int offset; 156 int length = msg.readableBytes(); 157 if (msg.hasArray()) { 158 array = msg.array(); 159 offset = msg.arrayOffset() + msg.readerIndex(); 160 } else { 161 array = new byte[length]; 162 msg.getBytes(msg.readerIndex(), array, 0, length); 163 offset = 0; 164 } 165 166 RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder(); 167 ProtobufUtil.mergeFrom(builder, array, offset, length); 168 return builder.build(); 169 } finally { 170 msg.release(); 171 } 172 } 173 174 /** 175 * Reads variable length 32bit int from buffer 176 * This method is from ProtobufVarint32FrameDecoder in Netty and modified a little bit 177 * to pass the cyeckstyle rule. 178 * 179 * @return decoded int if buffers readerIndex has been forwarded else nonsense value 180 */ 181 private static int readRawVarint32(ByteBuf buffer) { 182 if (!buffer.isReadable()) { 183 return 0; 184 } 185 buffer.markReaderIndex(); 186 byte tmp = buffer.readByte(); 187 if (tmp >= 0) { 188 return tmp; 189 } else { 190 int result = tmp & 127; 191 if (!buffer.isReadable()) { 192 buffer.resetReaderIndex(); 193 return 0; 194 } 195 tmp = buffer.readByte(); 196 if (tmp >= 0) { 197 result |= tmp << 7; 198 } else { 199 result |= (tmp & 127) << 7; 200 if (!buffer.isReadable()) { 201 buffer.resetReaderIndex(); 202 return 0; 203 } 204 tmp = buffer.readByte(); 205 if (tmp >= 0) { 206 result |= tmp << 14; 207 } else { 208 result |= (tmp & 127) << 14; 209 if (!buffer.isReadable()) { 210 buffer.resetReaderIndex(); 211 return 0; 212 } 213 tmp = buffer.readByte(); 214 if (tmp >= 0) { 215 result |= tmp << 21; 216 } else { 217 result |= (tmp & 127) << 21; 218 if (!buffer.isReadable()) { 219 buffer.resetReaderIndex(); 220 return 0; 221 } 222 tmp = buffer.readByte(); 223 result |= tmp << 28; 224 if (tmp < 0) { 225 throw new CorruptedFrameException("malformed varint."); 226 } 227 } 228 } 229 } 230 return result; 231 } 232 } 233}