001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.net.InetAddress; 023import java.net.Socket; 024import java.nio.ByteBuffer; 025import java.nio.channels.ReadableByteChannel; 026import java.nio.channels.SocketChannel; 027import java.util.concurrent.ConcurrentLinkedDeque; 028import java.util.concurrent.atomic.LongAdder; 029import java.util.concurrent.locks.Lock; 030import java.util.concurrent.locks.ReentrantLock; 031import org.apache.hadoop.hbase.CellScanner; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.client.VersionInfoUtil; 034import org.apache.hadoop.hbase.exceptions.RequestTooBigException; 035import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 036import org.apache.hadoop.hbase.nio.ByteBuff; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.yetus.audience.InterfaceAudience; 039 040import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 041import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 042import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 043import org.apache.hbase.thirdparty.com.google.protobuf.Message; 044 045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 047 048/** Reads calls from a connection and queues them for handling. */ 049@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 050 justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 051@Deprecated 052@InterfaceAudience.Private 053class SimpleServerRpcConnection extends ServerRpcConnection { 054 055 final SocketChannel channel; 056 private ByteBuff data; 057 private ByteBuffer dataLengthBuffer; 058 private ByteBuffer preambleBuffer; 059 private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs 060 private long lastContact; 061 private final Socket socket; 062 final SimpleRpcServerResponder responder; 063 064 // If initial preamble with version and magic has been read or not. 065 private boolean connectionPreambleRead = false; 066 067 final ConcurrentLinkedDeque<RpcResponse> responseQueue = new ConcurrentLinkedDeque<>(); 068 final Lock responseWriteLock = new ReentrantLock(); 069 long lastSentTime = -1L; 070 071 public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel, 072 long lastContact) { 073 super(rpcServer); 074 this.channel = channel; 075 this.lastContact = lastContact; 076 this.data = null; 077 this.dataLengthBuffer = ByteBuffer.allocate(4); 078 this.socket = channel.socket(); 079 this.addr = socket.getInetAddress(); 080 if (addr == null) { 081 this.hostAddress = "*Unknown*"; 082 } else { 083 this.hostAddress = addr.getHostAddress(); 084 } 085 this.remotePort = socket.getPort(); 086 if (rpcServer.socketSendBufferSize != 0) { 087 try { 088 socket.setSendBufferSize(rpcServer.socketSendBufferSize); 089 } catch (IOException e) { 090 SimpleRpcServer.LOG.warn( 091 "Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize); 092 } 093 } 094 this.responder = rpcServer.responder; 095 } 096 097 public void setLastContact(long lastContact) { 098 this.lastContact = lastContact; 099 } 100 101 public long getLastContact() { 102 return lastContact; 103 } 104 105 /* Return true if the connection has no outstanding rpc */ 106 boolean isIdle() { 107 return rpcCount.sum() == 0; 108 } 109 110 /* Decrement the outstanding RPC count */ 111 protected void decRpcCount() { 112 rpcCount.decrement(); 113 } 114 115 /* Increment the outstanding RPC count */ 116 protected void incRpcCount() { 117 rpcCount.increment(); 118 } 119 120 private int readPreamble() throws IOException { 121 if (preambleBuffer == null) { 122 preambleBuffer = ByteBuffer.allocate(6); 123 } 124 int count = this.rpcServer.channelRead(channel, preambleBuffer); 125 if (count < 0 || preambleBuffer.remaining() > 0) { 126 return count; 127 } 128 preambleBuffer.flip(); 129 if (!processPreamble(preambleBuffer)) { 130 return -1; 131 } 132 preambleBuffer = null; // do not need it anymore 133 connectionPreambleRead = true; 134 return count; 135 } 136 137 private int read4Bytes() throws IOException { 138 if (this.dataLengthBuffer.remaining() > 0) { 139 return this.rpcServer.channelRead(channel, this.dataLengthBuffer); 140 } else { 141 return 0; 142 } 143 } 144 145 /** 146 * Read off the wire. If there is not enough data to read, update the connection state with what 147 * we have and returns. 148 * @return Returns -1 if failure (and caller will close connection), else zero or more. nn 149 */ 150 public int readAndProcess() throws IOException, InterruptedException { 151 // If we have not read the connection setup preamble, look to see if that is on the wire. 152 if (!connectionPreambleRead) { 153 int count = readPreamble(); 154 if (!connectionPreambleRead) { 155 return count; 156 } 157 } 158 159 // Try and read in an int. it will be length of the data to read (or -1 if a ping). We catch the 160 // integer length into the 4-byte this.dataLengthBuffer. 161 int count = read4Bytes(); 162 if (count < 0 || dataLengthBuffer.remaining() > 0) { 163 return count; 164 } 165 166 // We have read a length and we have read the preamble. It is either the connection header 167 // or it is a request. 168 if (data == null) { 169 dataLengthBuffer.flip(); 170 int dataLength = dataLengthBuffer.getInt(); 171 if (dataLength == RpcClient.PING_CALL_ID) { 172 if (!useWrap) { // covers the !useSasl too 173 dataLengthBuffer.clear(); 174 return 0; // ping message 175 } 176 } 177 if (dataLength < 0) { // A data length of zero is legal. 178 throw new DoNotRetryIOException( 179 "Unexpected data length " + dataLength + "!! from " + getHostAddress()); 180 } 181 182 if (dataLength > this.rpcServer.maxRequestSize) { 183 String msg = "RPC data length of " + dataLength + " received from " + getHostAddress() 184 + " is greater than max allowed " + this.rpcServer.maxRequestSize + ". Set \"" 185 + SimpleRpcServer.MAX_REQUEST_SIZE 186 + "\" on server to override this limit (not recommended)"; 187 SimpleRpcServer.LOG.warn(msg); 188 189 if (connectionHeaderRead && connectionPreambleRead) { 190 incRpcCount(); 191 // Construct InputStream for the non-blocking SocketChannel 192 // We need the InputStream because we want to read only the request header 193 // instead of the whole rpc. 194 ByteBuffer buf = ByteBuffer.allocate(1); 195 InputStream is = new InputStream() { 196 @Override 197 public int read() throws IOException { 198 SimpleServerRpcConnection.this.rpcServer.channelRead(channel, buf); 199 buf.flip(); 200 int x = buf.get(); 201 buf.flip(); 202 return x; 203 } 204 }; 205 CodedInputStream cis = CodedInputStream.newInstance(is); 206 int headerSize = cis.readRawVarint32(); 207 Message.Builder builder = RequestHeader.newBuilder(); 208 ProtobufUtil.mergeFrom(builder, cis, headerSize); 209 RequestHeader header = (RequestHeader) builder.build(); 210 211 // Notify the client about the offending request 212 SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null, 213 null, null, null, this, 0, this.addr, EnvironmentEdgeManager.currentTime(), 0, 214 this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder); 215 RequestTooBigException reqTooBigEx = new RequestTooBigException(msg); 216 this.rpcServer.metrics.exception(reqTooBigEx); 217 // Make sure the client recognizes the underlying exception 218 // Otherwise, throw a DoNotRetryIOException. 219 if ( 220 VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(), 221 RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION) 222 ) { 223 reqTooBig.setResponse(null, null, reqTooBigEx, msg); 224 } else { 225 reqTooBig.setResponse(null, null, new DoNotRetryIOException(msg), msg); 226 } 227 // In most cases we will write out the response directly. If not, it is still OK to just 228 // close the connection without writing out the reqTooBig response. Do not try to write 229 // out directly here, and it will cause deserialization error if the connection is slow 230 // and we have a half writing response in the queue. 231 reqTooBig.sendResponseIfReady(); 232 } 233 // Close the connection 234 return -1; 235 } 236 237 // Initialize this.data with a ByteBuff. 238 // This call will allocate a ByteBuff to read request into and assign to this.data 239 // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and 240 // assign to this.callCleanup 241 initByteBuffToReadInto(dataLength); 242 243 // Increment the rpc count. This counter will be decreased when we write 244 // the response. If we want the connection to be detected as idle properly, we 245 // need to keep the inc / dec correct. 246 incRpcCount(); 247 } 248 249 count = channelDataRead(channel, data); 250 251 if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0 252 process(); 253 } 254 255 return count; 256 } 257 258 // It creates the ByteBuff and CallCleanup and assign to Connection instance. 259 private void initByteBuffToReadInto(int length) { 260 this.data = rpcServer.bbAllocator.allocate(length); 261 this.callCleanup = data::release; 262 } 263 264 protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException { 265 int count = buf.read(channel); 266 if (count > 0) { 267 this.rpcServer.metrics.receivedBytes(count); 268 } 269 return count; 270 } 271 272 /** 273 * Process the data buffer and clean the connection state for the next call. 274 */ 275 private void process() throws IOException, InterruptedException { 276 data.rewind(); 277 try { 278 if (skipInitialSaslHandshake) { 279 skipInitialSaslHandshake = false; 280 return; 281 } 282 283 if (useSasl) { 284 saslReadAndProcess(data); 285 } else { 286 processOneRpc(data); 287 } 288 } catch (Exception e) { 289 callCleanupIfNeeded(); 290 throw e; 291 } finally { 292 dataLengthBuffer.clear(); // Clean for the next call 293 data = null; // For the GC 294 this.callCleanup = null; 295 } 296 } 297 298 @Override 299 public synchronized void close() { 300 disposeSasl(); 301 data = null; 302 callCleanupIfNeeded(); 303 if (!channel.isOpen()) { 304 return; 305 } 306 try { 307 socket.shutdownOutput(); 308 } catch (Exception ignored) { 309 if (SimpleRpcServer.LOG.isTraceEnabled()) { 310 SimpleRpcServer.LOG.trace("Ignored exception", ignored); 311 } 312 } 313 if (channel.isOpen()) { 314 try { 315 channel.close(); 316 } catch (Exception ignored) { 317 } 318 } 319 try { 320 socket.close(); 321 } catch (Exception ignored) { 322 if (SimpleRpcServer.LOG.isTraceEnabled()) { 323 SimpleRpcServer.LOG.trace("Ignored exception", ignored); 324 } 325 } 326 } 327 328 @Override 329 public boolean isConnectionOpen() { 330 return channel.isOpen(); 331 } 332 333 @Override 334 public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md, 335 RequestHeader header, Message param, CellScanner cellScanner, long size, 336 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { 337 return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size, 338 remoteAddress, EnvironmentEdgeManager.currentTime(), timeout, this.rpcServer.bbAllocator, 339 this.rpcServer.cellBlockBuilder, reqCleanup, this.responder); 340 } 341 342 @Override 343 protected void doRespond(RpcResponse resp) throws IOException { 344 responder.doRespond(this, resp); 345 } 346}