001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.ByteArrayInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.net.InetAddress; 024import java.net.Socket; 025import java.nio.ByteBuffer; 026import java.nio.channels.Channels; 027import java.nio.channels.ReadableByteChannel; 028import java.nio.channels.SocketChannel; 029import java.util.concurrent.ConcurrentLinkedDeque; 030import java.util.concurrent.atomic.LongAdder; 031import java.util.concurrent.locks.Lock; 032import java.util.concurrent.locks.ReentrantLock; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.ExtendedCellScanner; 035import org.apache.hadoop.hbase.client.VersionInfoUtil; 036import org.apache.hadoop.hbase.exceptions.RequestTooBigException; 037import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 038import org.apache.hadoop.hbase.nio.ByteBuff; 039import org.apache.hadoop.hbase.nio.SingleByteBuff; 040import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; 041import org.apache.hadoop.hbase.security.SaslStatus; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.io.BytesWritable; 044import org.apache.yetus.audience.InterfaceAudience; 045 046import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 047import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 048import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 049import org.apache.hbase.thirdparty.com.google.protobuf.Message; 050 051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 053 054/** Reads calls from a connection and queues them for handling. */ 055@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", 056 justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") 057@Deprecated 058@InterfaceAudience.Private 059class SimpleServerRpcConnection extends ServerRpcConnection { 060 061 final SocketChannel channel; 062 private ByteBuff data; 063 private ByteBuffer dataLengthBuffer; 064 private ByteBuffer preambleBuffer; 065 private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs 066 private long lastContact; 067 private final Socket socket; 068 final SimpleRpcServerResponder responder; 069 070 // If initial preamble with version and magic has been read or not. 071 private boolean connectionPreambleRead = false; 072 private boolean saslContextEstablished; 073 private ByteBuffer unwrappedData; 074 // When is this set? FindBugs wants to know! Says NP 075 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4); 076 boolean useWrap = false; 077 078 final ConcurrentLinkedDeque<RpcResponse> responseQueue = new ConcurrentLinkedDeque<>(); 079 final Lock responseWriteLock = new ReentrantLock(); 080 long lastSentTime = -1L; 081 082 public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel, 083 long lastContact) { 084 super(rpcServer); 085 this.channel = channel; 086 this.lastContact = lastContact; 087 this.data = null; 088 this.dataLengthBuffer = ByteBuffer.allocate(4); 089 this.socket = channel.socket(); 090 this.addr = socket.getInetAddress(); 091 if (addr == null) { 092 this.hostAddress = "*Unknown*"; 093 } else { 094 this.hostAddress = addr.getHostAddress(); 095 } 096 this.remotePort = socket.getPort(); 097 if (rpcServer.socketSendBufferSize != 0) { 098 try { 099 socket.setSendBufferSize(rpcServer.socketSendBufferSize); 100 } catch (IOException e) { 101 SimpleRpcServer.LOG.warn( 102 "Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize); 103 } 104 } 105 this.responder = rpcServer.responder; 106 } 107 108 public void setLastContact(long lastContact) { 109 this.lastContact = lastContact; 110 } 111 112 public long getLastContact() { 113 return lastContact; 114 } 115 116 /* Return true if the connection has no outstanding rpc */ 117 boolean isIdle() { 118 return rpcCount.sum() == 0; 119 } 120 121 /* Decrement the outstanding RPC count */ 122 protected void decRpcCount() { 123 rpcCount.decrement(); 124 } 125 126 /* Increment the outstanding RPC count */ 127 protected void incRpcCount() { 128 rpcCount.increment(); 129 } 130 131 private int readPreamble() throws IOException { 132 if (preambleBuffer == null) { 133 preambleBuffer = ByteBuffer.allocate(6); 134 } 135 int count = this.rpcServer.channelRead(channel, preambleBuffer); 136 if (count < 0 || preambleBuffer.remaining() > 0) { 137 return count; 138 } 139 preambleBuffer.flip(); 140 PreambleResponse resp = processPreamble(preambleBuffer); 141 switch (resp) { 142 case SUCCEED: 143 preambleBuffer = null; // do not need it anymore 144 connectionPreambleRead = true; 145 return count; 146 case CONTINUE: 147 // wait for the next preamble header 148 preambleBuffer.clear(); 149 return count; 150 case CLOSE: 151 return -1; 152 default: 153 throw new IllegalArgumentException("Unknown preamble response: " + resp); 154 } 155 } 156 157 private int read4Bytes() throws IOException { 158 if (this.dataLengthBuffer.remaining() > 0) { 159 return this.rpcServer.channelRead(channel, this.dataLengthBuffer); 160 } else { 161 return 0; 162 } 163 } 164 165 private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException { 166 ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf)); 167 // Read all RPCs contained in the inBuf, even partial ones 168 while (true) { 169 int count; 170 if (unwrappedDataLengthBuffer.remaining() > 0) { 171 count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer); 172 if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) { 173 return; 174 } 175 } 176 177 if (unwrappedData == null) { 178 unwrappedDataLengthBuffer.flip(); 179 int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); 180 181 if (unwrappedDataLength == RpcClient.PING_CALL_ID) { 182 if (RpcServer.LOG.isDebugEnabled()) RpcServer.LOG.debug("Received ping message"); 183 unwrappedDataLengthBuffer.clear(); 184 continue; // ping message 185 } 186 unwrappedData = ByteBuffer.allocate(unwrappedDataLength); 187 } 188 189 count = this.rpcServer.channelRead(ch, unwrappedData); 190 if (count <= 0 || unwrappedData.remaining() > 0) { 191 return; 192 } 193 194 if (unwrappedData.remaining() == 0) { 195 unwrappedDataLengthBuffer.clear(); 196 unwrappedData.flip(); 197 processOneRpc(new SingleByteBuff(unwrappedData)); 198 unwrappedData = null; 199 } 200 } 201 } 202 203 private void saslReadAndProcess(ByteBuff saslToken) throws IOException, InterruptedException { 204 if (saslContextEstablished) { 205 RpcServer.LOG.trace("Read input token of size={} for processing by saslServer.unwrap()", 206 saslToken.limit()); 207 if (!useWrap) { 208 processOneRpc(saslToken); 209 } else { 210 byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes(); 211 byte[] plaintextData = saslServer.unwrap(b, 0, b.length); 212 // release the request buffer as we have already unwrapped all its content 213 callCleanupIfNeeded(); 214 processUnwrappedData(plaintextData); 215 } 216 } else { 217 byte[] replyToken; 218 try { 219 try { 220 getOrCreateSaslServer(); 221 } catch (Exception e) { 222 RpcServer.LOG.error("Error when trying to create instance of HBaseSaslRpcServer " 223 + "with sasl provider: " + provider, e); 224 throw e; 225 } 226 RpcServer.LOG.debug("Created SASL server with mechanism={}", 227 provider.getSaslAuthMethod().getAuthMethod()); 228 RpcServer.LOG.debug( 229 "Read input token of size={} for processing by saslServer." + "evaluateResponse()", 230 saslToken.limit()); 231 replyToken = saslServer 232 .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes()); 233 } catch (IOException e) { 234 RpcServer.LOG.debug("Failed to execute SASL handshake", e); 235 Throwable sendToClient = HBaseSaslRpcServer.unwrap(e); 236 doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(), 237 sendToClient.getLocalizedMessage()); 238 this.rpcServer.metrics.authenticationFailure(); 239 String clientIP = this.toString(); 240 // attempting user could be null 241 RpcServer.AUDITLOG.warn("{}{}: {}", RpcServer.AUTH_FAILED_FOR, clientIP, 242 saslServer.getAttemptingUser()); 243 throw e; 244 } finally { 245 // release the request buffer as we have already unwrapped all its content 246 callCleanupIfNeeded(); 247 } 248 if (replyToken != null) { 249 if (RpcServer.LOG.isDebugEnabled()) { 250 RpcServer.LOG.debug("Will send token of size " + replyToken.length + " from saslServer."); 251 } 252 doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null, null); 253 } 254 if (saslServer.isComplete()) { 255 String qop = saslServer.getNegotiatedQop(); 256 useWrap = qop != null && !"auth".equalsIgnoreCase(qop); 257 ugi = 258 provider.getAuthorizedUgi(saslServer.getAuthorizationID(), this.rpcServer.secretManager); 259 RpcServer.LOG.debug( 260 "SASL server context established. Authenticated client: {}. Negotiated QoP is {}", ugi, 261 qop); 262 this.rpcServer.metrics.authenticationSuccess(); 263 RpcServer.AUDITLOG.info(RpcServer.AUTH_SUCCESSFUL_FOR + ugi); 264 saslContextEstablished = true; 265 } 266 } 267 } 268 269 /** 270 * Read off the wire. If there is not enough data to read, update the connection state with what 271 * we have and returns. 272 * @return Returns -1 if failure (and caller will close connection), else zero or more. 273 */ 274 public int readAndProcess() throws IOException, InterruptedException { 275 // If we have not read the connection setup preamble, look to see if that is on the wire. 276 if (!connectionPreambleRead) { 277 int count = readPreamble(); 278 if (!connectionPreambleRead) { 279 return count; 280 } 281 } 282 283 // Try and read in an int. it will be length of the data to read (or -1 if a ping). We catch the 284 // integer length into the 4-byte this.dataLengthBuffer. 285 int count = read4Bytes(); 286 if (count < 0 || dataLengthBuffer.remaining() > 0) { 287 return count; 288 } 289 290 // We have read a length and we have read the preamble. It is either the connection header 291 // or it is a request. 292 if (data == null) { 293 dataLengthBuffer.flip(); 294 int dataLength = dataLengthBuffer.getInt(); 295 if (dataLength == RpcClient.PING_CALL_ID) { 296 if (!useWrap) { // covers the !useSasl too 297 dataLengthBuffer.clear(); 298 return 0; // ping message 299 } 300 } 301 if (dataLength < 0) { // A data length of zero is legal. 302 throw new DoNotRetryIOException( 303 "Unexpected data length " + dataLength + "!! from " + getHostAddress()); 304 } 305 306 if (dataLength > this.rpcServer.maxRequestSize) { 307 String msg = "RPC data length of " + dataLength + " received from " + getHostAddress() 308 + " is greater than max allowed " + this.rpcServer.maxRequestSize + ". Set \"" 309 + SimpleRpcServer.MAX_REQUEST_SIZE 310 + "\" on server to override this limit (not recommended)"; 311 SimpleRpcServer.LOG.warn(msg); 312 313 if (connectionHeaderRead && connectionPreambleRead) { 314 incRpcCount(); 315 // Construct InputStream for the non-blocking SocketChannel 316 // We need the InputStream because we want to read only the request header 317 // instead of the whole rpc. 318 ByteBuffer buf = ByteBuffer.allocate(1); 319 InputStream is = new InputStream() { 320 @Override 321 public int read() throws IOException { 322 SimpleServerRpcConnection.this.rpcServer.channelRead(channel, buf); 323 buf.flip(); 324 int x = buf.get(); 325 buf.flip(); 326 return x; 327 } 328 }; 329 CodedInputStream cis = CodedInputStream.newInstance(is); 330 int headerSize = cis.readRawVarint32(); 331 Message.Builder builder = RequestHeader.newBuilder(); 332 ProtobufUtil.mergeFrom(builder, cis, headerSize); 333 RequestHeader header = (RequestHeader) builder.build(); 334 335 // Notify the client about the offending request 336 SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null, 337 null, null, null, this, 0, this.addr, EnvironmentEdgeManager.currentTime(), 0, 338 this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder); 339 RequestTooBigException reqTooBigEx = new RequestTooBigException(msg); 340 this.rpcServer.metrics.exception(reqTooBigEx); 341 // Make sure the client recognizes the underlying exception 342 // Otherwise, throw a DoNotRetryIOException. 343 if ( 344 VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(), 345 RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION) 346 ) { 347 reqTooBig.setResponse(null, null, reqTooBigEx, msg); 348 } else { 349 reqTooBig.setResponse(null, null, new DoNotRetryIOException(msg), msg); 350 } 351 // In most cases we will write out the response directly. If not, it is still OK to just 352 // close the connection without writing out the reqTooBig response. Do not try to write 353 // out directly here, and it will cause deserialization error if the connection is slow 354 // and we have a half writing response in the queue. 355 reqTooBig.sendResponseIfReady(); 356 } 357 // Close the connection 358 return -1; 359 } 360 361 // Initialize this.data with a ByteBuff. 362 // This call will allocate a ByteBuff to read request into and assign to this.data 363 // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and 364 // assign to this.callCleanup 365 initByteBuffToReadInto(dataLength); 366 367 // Increment the rpc count. This counter will be decreased when we write 368 // the response. If we want the connection to be detected as idle properly, we 369 // need to keep the inc / dec correct. 370 incRpcCount(); 371 } 372 373 count = channelDataRead(channel, data); 374 375 if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0 376 process(); 377 } 378 379 return count; 380 } 381 382 // It creates the ByteBuff and CallCleanup and assign to Connection instance. 383 private void initByteBuffToReadInto(int length) { 384 this.data = rpcServer.bbAllocator.allocate(length); 385 this.callCleanup = data::release; 386 } 387 388 protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException { 389 int count = buf.read(channel); 390 if (count > 0) { 391 this.rpcServer.metrics.receivedBytes(count); 392 } 393 return count; 394 } 395 396 /** 397 * Process the data buffer and clean the connection state for the next call. 398 */ 399 private void process() throws IOException, InterruptedException { 400 data.rewind(); 401 try { 402 if (skipInitialSaslHandshake) { 403 skipInitialSaslHandshake = false; 404 return; 405 } 406 407 if (useSasl) { 408 saslReadAndProcess(data); 409 } else { 410 processOneRpc(data); 411 } 412 } catch (Exception e) { 413 callCleanupIfNeeded(); 414 throw e; 415 } finally { 416 dataLengthBuffer.clear(); // Clean for the next call 417 data = null; // For the GC 418 this.callCleanup = null; 419 } 420 } 421 422 @Override 423 public synchronized void close() { 424 disposeSasl(); 425 data = null; 426 callCleanupIfNeeded(); 427 if (!channel.isOpen()) { 428 return; 429 } 430 try { 431 socket.shutdownOutput(); 432 } catch (Exception ignored) { 433 if (SimpleRpcServer.LOG.isTraceEnabled()) { 434 SimpleRpcServer.LOG.trace("Ignored exception", ignored); 435 } 436 } 437 if (channel.isOpen()) { 438 try { 439 channel.close(); 440 } catch (Exception ignored) { 441 } 442 } 443 try { 444 socket.close(); 445 } catch (Exception ignored) { 446 if (SimpleRpcServer.LOG.isTraceEnabled()) { 447 SimpleRpcServer.LOG.trace("Ignored exception", ignored); 448 } 449 } 450 } 451 452 @Override 453 public boolean isConnectionOpen() { 454 return channel.isOpen(); 455 } 456 457 @Override 458 public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md, 459 RequestHeader header, Message param, ExtendedCellScanner cellScanner, long size, 460 InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { 461 return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size, 462 remoteAddress, EnvironmentEdgeManager.currentTime(), timeout, this.rpcServer.bbAllocator, 463 this.rpcServer.cellBlockBuilder, reqCleanup, this.responder); 464 } 465 466 @Override 467 protected void doRespond(RpcResponse resp) throws IOException { 468 responder.doRespond(this, resp); 469 } 470}