001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.net.InetAddress; 023import java.net.Socket; 024import java.nio.ByteBuffer; 025import java.nio.channels.ReadableByteChannel; 026import java.nio.channels.SocketChannel; 027import java.util.concurrent.ConcurrentLinkedDeque; 028import java.util.concurrent.atomic.LongAdder; 029import java.util.concurrent.locks.Lock; 030import java.util.concurrent.locks.ReentrantLock; 031 032import org.apache.hadoop.hbase.CellScanner; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.hadoop.hbase.client.VersionInfoUtil; 036import org.apache.hadoop.hbase.exceptions.RequestTooBigException; 037import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 038import org.apache.hadoop.hbase.nio.ByteBuff; 039import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 040import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 041import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 042import org.apache.hbase.thirdparty.com.google.protobuf.Message; 043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 045 046/** Reads calls from a connection and queues them for handling. */ 047@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 048 justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 049@InterfaceAudience.Private 050class SimpleServerRpcConnection extends ServerRpcConnection { 051 052 final SocketChannel channel; 053 private ByteBuff data; 054 private ByteBuffer dataLengthBuffer; 055 private ByteBuffer preambleBuffer; 056 private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs 057 private long lastContact; 058 private final Socket socket; 059 final SimpleRpcServerResponder responder; 060 061 // If initial preamble with version and magic has been read or not. 062 private boolean connectionPreambleRead = false; 063 064 final ConcurrentLinkedDeque<RpcResponse> responseQueue = new ConcurrentLinkedDeque<>(); 065 final Lock responseWriteLock = new ReentrantLock(); 066 long lastSentTime = -1L; 067 068 public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel, 069 long lastContact) { 070 super(rpcServer); 071 this.channel = channel; 072 this.lastContact = lastContact; 073 this.data = null; 074 this.dataLengthBuffer = ByteBuffer.allocate(4); 075 this.socket = channel.socket(); 076 this.addr = socket.getInetAddress(); 077 if (addr == null) { 078 this.hostAddress = "*Unknown*"; 079 } else { 080 this.hostAddress = addr.getHostAddress(); 081 } 082 this.remotePort = socket.getPort(); 083 if (rpcServer.socketSendBufferSize != 0) { 084 try { 085 socket.setSendBufferSize(rpcServer.socketSendBufferSize); 086 } catch (IOException e) { 087 SimpleRpcServer.LOG.warn( 088 "Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize); 089 } 090 } 091 this.responder = rpcServer.responder; 092 } 093 094 public void setLastContact(long lastContact) { 095 this.lastContact = lastContact; 096 } 097 098 public long getLastContact() { 099 return lastContact; 100 } 101 102 /* Return true if the connection has no outstanding rpc */ 103 boolean isIdle() { 104 return rpcCount.sum() == 0; 105 } 106 107 /* Decrement the outstanding RPC count */ 108 protected void decRpcCount() { 109 rpcCount.decrement(); 110 } 111 112 /* Increment the outstanding RPC count */ 113 protected void incRpcCount() { 114 rpcCount.increment(); 115 } 116 117 private int readPreamble() throws IOException { 118 if (preambleBuffer == null) { 119 preambleBuffer = ByteBuffer.allocate(6); 120 } 121 int count = this.rpcServer.channelRead(channel, preambleBuffer); 122 if (count < 0 || preambleBuffer.remaining() > 0) { 123 return count; 124 } 125 preambleBuffer.flip(); 126 if (!processPreamble(preambleBuffer)) { 127 return -1; 128 } 129 preambleBuffer = null; // do not need it anymore 130 connectionPreambleRead = true; 131 return count; 132 } 133 134 private int read4Bytes() throws IOException { 135 if (this.dataLengthBuffer.remaining() > 0) { 136 return this.rpcServer.channelRead(channel, this.dataLengthBuffer); 137 } else { 138 return 0; 139 } 140 } 141 142 /** 143 * Read off the wire. If there is not enough data to read, update the connection state with what 144 * we have and returns. 145 * @return Returns -1 if failure (and caller will close connection), else zero or more. 146 * @throws IOException 147 * @throws InterruptedException 148 */ 149 public int readAndProcess() throws IOException, InterruptedException { 150 // If we have not read the connection setup preamble, look to see if that is on the wire. 151 if (!connectionPreambleRead) { 152 int count = readPreamble(); 153 if (!connectionPreambleRead) { 154 return count; 155 } 156 } 157 158 // Try and read in an int. it will be length of the data to read (or -1 if a ping). We catch the 159 // integer length into the 4-byte this.dataLengthBuffer. 160 int count = read4Bytes(); 161 if (count < 0 || dataLengthBuffer.remaining() > 0) { 162 return count; 163 } 164 165 // We have read a length and we have read the preamble. It is either the connection header 166 // or it is a request. 167 if (data == null) { 168 dataLengthBuffer.flip(); 169 int dataLength = dataLengthBuffer.getInt(); 170 if (dataLength == RpcClient.PING_CALL_ID) { 171 if (!useWrap) { // covers the !useSasl too 172 dataLengthBuffer.clear(); 173 return 0; // ping message 174 } 175 } 176 if (dataLength < 0) { // A data length of zero is legal. 177 throw new DoNotRetryIOException( 178 "Unexpected data length " + dataLength + "!! from " + getHostAddress()); 179 } 180 181 if (dataLength > this.rpcServer.maxRequestSize) { 182 String msg = "RPC data length of " + dataLength + " received from " + getHostAddress() + 183 " is greater than max allowed " + this.rpcServer.maxRequestSize + ". Set \"" + 184 SimpleRpcServer.MAX_REQUEST_SIZE + 185 "\" on server to override this limit (not recommended)"; 186 SimpleRpcServer.LOG.warn(msg); 187 188 if (connectionHeaderRead && connectionPreambleRead) { 189 incRpcCount(); 190 // Construct InputStream for the non-blocking SocketChannel 191 // We need the InputStream because we want to read only the request header 192 // instead of the whole rpc. 193 ByteBuffer buf = ByteBuffer.allocate(1); 194 InputStream is = new InputStream() { 195 @Override 196 public int read() throws IOException { 197 SimpleServerRpcConnection.this.rpcServer.channelRead(channel, buf); 198 buf.flip(); 199 int x = buf.get(); 200 buf.flip(); 201 return x; 202 } 203 }; 204 CodedInputStream cis = CodedInputStream.newInstance(is); 205 int headerSize = cis.readRawVarint32(); 206 Message.Builder builder = RequestHeader.newBuilder(); 207 ProtobufUtil.mergeFrom(builder, cis, headerSize); 208 RequestHeader header = (RequestHeader) builder.build(); 209 210 // Notify the client about the offending request 211 SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null, 212 null, null, null, this, 0, this.addr, System.currentTimeMillis(), 0, 213 this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder); 214 this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION); 215 // Make sure the client recognizes the underlying exception 216 // Otherwise, throw a DoNotRetryIOException. 217 if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(), 218 RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) { 219 reqTooBig.setResponse(null, null, SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION, msg); 220 } else { 221 reqTooBig.setResponse(null, null, new DoNotRetryIOException(), msg); 222 } 223 // In most cases we will write out the response directly. If not, it is still OK to just 224 // close the connection without writing out the reqTooBig response. Do not try to write 225 // out directly here, and it will cause deserialization error if the connection is slow 226 // and we have a half writing response in the queue. 227 reqTooBig.sendResponseIfReady(); 228 } 229 // Close the connection 230 return -1; 231 } 232 233 // Initialize this.data with a ByteBuff. 234 // This call will allocate a ByteBuff to read request into and assign to this.data 235 // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and 236 // assign to this.callCleanup 237 initByteBuffToReadInto(dataLength); 238 239 // Increment the rpc count. This counter will be decreased when we write 240 // the response. If we want the connection to be detected as idle properly, we 241 // need to keep the inc / dec correct. 242 incRpcCount(); 243 } 244 245 count = channelDataRead(channel, data); 246 247 if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0 248 process(); 249 } 250 251 return count; 252 } 253 254 // It creates the ByteBuff and CallCleanup and assign to Connection instance. 255 private void initByteBuffToReadInto(int length) { 256 this.data = rpcServer.bbAllocator.allocate(length); 257 this.callCleanup = data::release; 258 } 259 260 protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException { 261 int count = buf.read(channel); 262 if (count > 0) { 263 this.rpcServer.metrics.receivedBytes(count); 264 } 265 return count; 266 } 267 268 /** 269 * Process the data buffer and clean the connection state for the next call. 270 */ 271 private void process() throws IOException, InterruptedException { 272 data.rewind(); 273 try { 274 if (skipInitialSaslHandshake) { 275 skipInitialSaslHandshake = false; 276 return; 277 } 278 279 if (useSasl) { 280 saslReadAndProcess(data); 281 } else { 282 processOneRpc(data); 283 } 284 285 } finally { 286 dataLengthBuffer.clear(); // Clean for the next call 287 data = null; // For the GC 288 this.callCleanup = null; 289 } 290 } 291 292 @Override 293 public synchronized void close() { 294 disposeSasl(); 295 data = null; 296 callCleanup = null; 297 if (!channel.isOpen()) return; 298 try { 299 socket.shutdownOutput(); 300 } catch (Exception ignored) { 301 if (SimpleRpcServer.LOG.isTraceEnabled()) { 302 SimpleRpcServer.LOG.trace("Ignored exception", ignored); 303 } 304 } 305 if (channel.isOpen()) { 306 try { 307 channel.close(); 308 } catch (Exception ignored) { 309 } 310 } 311 try { 312 socket.close(); 313 } catch (Exception ignored) { 314 if (SimpleRpcServer.LOG.isTraceEnabled()) { 315 SimpleRpcServer.LOG.trace("Ignored exception", ignored); 316 } 317 } 318 } 319 320 @Override 321 public boolean isConnectionOpen() { 322 return channel.isOpen(); 323 } 324 325 @Override 326 public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md, 327 RequestHeader header, Message param, CellScanner cellScanner, long size, 328 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { 329 return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size, 330 remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator, 331 this.rpcServer.cellBlockBuilder, reqCleanup, this.responder); 332 } 333 334 @Override 335 protected void doRespond(RpcResponse resp) throws IOException { 336 responder.doRespond(this, resp); 337 } 338}