001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.net.InetAddress; 023import java.net.Socket; 024import java.nio.ByteBuffer; 025import java.nio.channels.ReadableByteChannel; 026import java.nio.channels.SocketChannel; 027import java.util.concurrent.ConcurrentLinkedDeque; 028import java.util.concurrent.atomic.LongAdder; 029import java.util.concurrent.locks.Lock; 030import java.util.concurrent.locks.ReentrantLock; 031 032import org.apache.hadoop.hbase.CellScanner; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.hadoop.hbase.client.VersionInfoUtil; 036import org.apache.hadoop.hbase.exceptions.RequestTooBigException; 037import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 038import org.apache.hadoop.hbase.nio.ByteBuff; 039import org.apache.hadoop.hbase.nio.SingleByteBuff; 040import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 041import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 042import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 043import org.apache.hbase.thirdparty.com.google.protobuf.Message; 044import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 046import org.apache.hadoop.hbase.util.Pair; 047 048/** Reads calls from a connection and queues them for handling. */ 049@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 050 justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 051@InterfaceAudience.Private 052class SimpleServerRpcConnection extends ServerRpcConnection { 053 054 final SocketChannel channel; 055 private ByteBuff data; 056 private ByteBuffer dataLengthBuffer; 057 private ByteBuffer preambleBuffer; 058 private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs 059 private long lastContact; 060 private final Socket socket; 061 final SimpleRpcServerResponder responder; 062 063 // If initial preamble with version and magic has been read or not. 064 private boolean connectionPreambleRead = false; 065 066 final ConcurrentLinkedDeque<RpcResponse> responseQueue = new ConcurrentLinkedDeque<>(); 067 final Lock responseWriteLock = new ReentrantLock(); 068 long lastSentTime = -1L; 069 070 public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel, 071 long lastContact) { 072 super(rpcServer); 073 this.channel = channel; 074 this.lastContact = lastContact; 075 this.data = null; 076 this.dataLengthBuffer = ByteBuffer.allocate(4); 077 this.socket = channel.socket(); 078 this.addr = socket.getInetAddress(); 079 if (addr == null) { 080 this.hostAddress = "*Unknown*"; 081 } else { 082 this.hostAddress = addr.getHostAddress(); 083 } 084 this.remotePort = socket.getPort(); 085 if (rpcServer.socketSendBufferSize != 0) { 086 try { 087 socket.setSendBufferSize(rpcServer.socketSendBufferSize); 088 } catch (IOException e) { 089 SimpleRpcServer.LOG.warn( 090 "Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize); 091 } 092 } 093 this.responder = rpcServer.responder; 094 } 095 096 public void setLastContact(long lastContact) { 097 this.lastContact = lastContact; 098 } 099 100 public long getLastContact() { 101 return lastContact; 102 } 103 104 /* Return true if the connection has no outstanding rpc */ 105 boolean isIdle() { 106 return rpcCount.sum() == 0; 107 } 108 109 /* Decrement the outstanding RPC count */ 110 protected void decRpcCount() { 111 rpcCount.decrement(); 112 } 113 114 /* Increment the outstanding RPC count */ 115 protected void incRpcCount() { 116 rpcCount.increment(); 117 } 118 119 private int readPreamble() throws IOException { 120 if (preambleBuffer == null) { 121 preambleBuffer = ByteBuffer.allocate(6); 122 } 123 int count = this.rpcServer.channelRead(channel, preambleBuffer); 124 if (count < 0 || preambleBuffer.remaining() > 0) { 125 return count; 126 } 127 preambleBuffer.flip(); 128 if (!processPreamble(preambleBuffer)) { 129 return -1; 130 } 131 preambleBuffer = null; // do not need it anymore 132 connectionPreambleRead = true; 133 return count; 134 } 135 136 private int read4Bytes() throws IOException { 137 if (this.dataLengthBuffer.remaining() > 0) { 138 return this.rpcServer.channelRead(channel, this.dataLengthBuffer); 139 } else { 140 return 0; 141 } 142 } 143 144 /** 145 * Read off the wire. If there is not enough data to read, update the connection state with what 146 * we have and returns. 147 * @return Returns -1 if failure (and caller will close connection), else zero or more. 148 * @throws IOException 149 * @throws InterruptedException 150 */ 151 public int readAndProcess() throws IOException, InterruptedException { 152 // If we have not read the connection setup preamble, look to see if that is on the wire. 153 if (!connectionPreambleRead) { 154 int count = readPreamble(); 155 if (!connectionPreambleRead) { 156 return count; 157 } 158 } 159 160 // Try and read in an int. it will be length of the data to read (or -1 if a ping). We catch the 161 // integer length into the 4-byte this.dataLengthBuffer. 162 int count = read4Bytes(); 163 if (count < 0 || dataLengthBuffer.remaining() > 0) { 164 return count; 165 } 166 167 // We have read a length and we have read the preamble. It is either the connection header 168 // or it is a request. 169 if (data == null) { 170 dataLengthBuffer.flip(); 171 int dataLength = dataLengthBuffer.getInt(); 172 if (dataLength == RpcClient.PING_CALL_ID) { 173 if (!useWrap) { // covers the !useSasl too 174 dataLengthBuffer.clear(); 175 return 0; // ping message 176 } 177 } 178 if (dataLength < 0) { // A data length of zero is legal. 179 throw new DoNotRetryIOException( 180 "Unexpected data length " + dataLength + "!! from " + getHostAddress()); 181 } 182 183 if (dataLength > this.rpcServer.maxRequestSize) { 184 String msg = "RPC data length of " + dataLength + " received from " + getHostAddress() + 185 " is greater than max allowed " + this.rpcServer.maxRequestSize + ". Set \"" + 186 SimpleRpcServer.MAX_REQUEST_SIZE + 187 "\" on server to override this limit (not recommended)"; 188 SimpleRpcServer.LOG.warn(msg); 189 190 if (connectionHeaderRead && connectionPreambleRead) { 191 incRpcCount(); 192 // Construct InputStream for the non-blocking SocketChannel 193 // We need the InputStream because we want to read only the request header 194 // instead of the whole rpc. 195 ByteBuffer buf = ByteBuffer.allocate(1); 196 InputStream is = new InputStream() { 197 @Override 198 public int read() throws IOException { 199 SimpleServerRpcConnection.this.rpcServer.channelRead(channel, buf); 200 buf.flip(); 201 int x = buf.get(); 202 buf.flip(); 203 return x; 204 } 205 }; 206 CodedInputStream cis = CodedInputStream.newInstance(is); 207 int headerSize = cis.readRawVarint32(); 208 Message.Builder builder = RequestHeader.newBuilder(); 209 ProtobufUtil.mergeFrom(builder, cis, headerSize); 210 RequestHeader header = (RequestHeader) builder.build(); 211 212 // Notify the client about the offending request 213 SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null, 214 null, null, null, this, 0, this.addr, System.currentTimeMillis(), 0, 215 this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, null, responder); 216 this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION); 217 // Make sure the client recognizes the underlying exception 218 // Otherwise, throw a DoNotRetryIOException. 219 if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(), 220 RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) { 221 reqTooBig.setResponse(null, null, SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, msg); 222 } else { 223 reqTooBig.setResponse(null, null, new DoNotRetryIOException(), msg); 224 } 225 // In most cases we will write out the response directly. If not, it is still OK to just 226 // close the connection without writing out the reqTooBig response. Do not try to write 227 // out directly here, and it will cause deserialization error if the connection is slow 228 // and we have a half writing response in the queue. 229 reqTooBig.sendResponseIfReady(); 230 } 231 // Close the connection 232 return -1; 233 } 234 235 // Initialize this.data with a ByteBuff. 236 // This call will allocate a ByteBuff to read request into and assign to this.data 237 // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and 238 // assign to this.callCleanup 239 initByteBuffToReadInto(dataLength); 240 241 // Increment the rpc count. This counter will be decreased when we write 242 // the response. If we want the connection to be detected as idle properly, we 243 // need to keep the inc / dec correct. 244 incRpcCount(); 245 } 246 247 count = channelDataRead(channel, data); 248 249 if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0 250 process(); 251 } 252 253 return count; 254 } 255 256 // It creates the ByteBuff and CallCleanup and assign to Connection instance. 257 private void initByteBuffToReadInto(int length) { 258 // We create random on heap buffers are read into those when 259 // 1. ByteBufferPool is not there. 260 // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is 261 // waste then. Also if all the reqs are of this size, we will be creating larger sized 262 // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like 263 // RegionOpen. 264 // 3. If it is an initial handshake signal or initial connection request. Any way then 265 // condition 2 itself will match 266 // 4. When SASL use is ON. 267 if (this.rpcServer.reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || 268 useSasl || length < this.rpcServer.minSizeForReservoirUse) { 269 this.data = new SingleByteBuff(ByteBuffer.allocate(length)); 270 } else { 271 Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto( 272 this.rpcServer.reservoir, this.rpcServer.minSizeForReservoirUse, length); 273 this.data = pair.getFirst(); 274 this.callCleanup = pair.getSecond(); 275 } 276 } 277 278 protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException { 279 int count = buf.read(channel); 280 if (count > 0) { 281 this.rpcServer.metrics.receivedBytes(count); 282 } 283 return count; 284 } 285 286 /** 287 * Process the data buffer and clean the connection state for the next call. 288 */ 289 private void process() throws IOException, InterruptedException { 290 data.rewind(); 291 try { 292 if (skipInitialSaslHandshake) { 293 skipInitialSaslHandshake = false; 294 return; 295 } 296 297 if (useSasl) { 298 saslReadAndProcess(data); 299 } else { 300 processOneRpc(data); 301 } 302 303 } finally { 304 dataLengthBuffer.clear(); // Clean for the next call 305 data = null; // For the GC 306 this.callCleanup = null; 307 } 308 } 309 310 @Override 311 public synchronized void close() { 312 disposeSasl(); 313 data = null; 314 callCleanup = null; 315 if (!channel.isOpen()) return; 316 try { 317 socket.shutdownOutput(); 318 } catch (Exception ignored) { 319 if (SimpleRpcServer.LOG.isTraceEnabled()) { 320 SimpleRpcServer.LOG.trace("Ignored exception", ignored); 321 } 322 } 323 if (channel.isOpen()) { 324 try { 325 channel.close(); 326 } catch (Exception ignored) { 327 } 328 } 329 try { 330 socket.close(); 331 } catch (Exception ignored) { 332 if (SimpleRpcServer.LOG.isTraceEnabled()) { 333 SimpleRpcServer.LOG.trace("Ignored exception", ignored); 334 } 335 } 336 } 337 338 @Override 339 public boolean isConnectionOpen() { 340 return channel.isOpen(); 341 } 342 343 @Override 344 public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md, 345 RequestHeader header, Message param, CellScanner cellScanner, long size, 346 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { 347 return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size, 348 remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir, 349 this.rpcServer.cellBlockBuilder, reqCleanup, this.responder); 350 } 351 352 @Override 353 protected void doRespond(RpcResponse resp) throws IOException { 354 responder.doRespond(this, resp); 355 } 356}