001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.InetAddress; 022import java.nio.ByteBuffer; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Optional; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.hbase.CellScanner; 028import org.apache.hadoop.hbase.DoNotRetryIOException; 029import org.apache.hadoop.hbase.io.ByteBuffAllocator; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.apache.hadoop.hbase.exceptions.RegionMovedException; 032import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; 033import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 034import org.apache.hadoop.hbase.security.User; 035import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 036import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 037import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 038import org.apache.hbase.thirdparty.com.google.protobuf.Message; 039import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 045import org.apache.hadoop.hbase.util.ByteBufferUtils; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.util.StringUtils; 048 049/** 050 * Datastructure that holds all necessary to a method invocation and then afterward, carries 051 * the result. 052 */ 053@InterfaceAudience.Private 054public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse { 055 056 protected final int id; // the client's call id 057 protected final BlockingService service; 058 protected final MethodDescriptor md; 059 protected final RequestHeader header; 060 protected Message param; // the parameter passed 061 // Optional cell data passed outside of protobufs. 062 protected final CellScanner cellScanner; 063 protected final T connection; // connection to client 064 protected final long receiveTime; // the time received when response is null 065 // the time served when response is not null 066 protected final int timeout; 067 protected long startTime; 068 protected final long deadline;// the deadline to handle this call, if exceed we can drop it. 069 070 protected final ByteBuffAllocator bbAllocator; 071 072 protected final CellBlockBuilder cellBlockBuilder; 073 074 /** 075 * Chain of buffers to send as response. 076 */ 077 protected BufferChain response; 078 079 protected final long size; // size of current call 080 protected boolean isError; 081 protected ByteBufferListOutputStream cellBlockStream = null; 082 protected CallCleanup reqCleanup = null; 083 084 protected final User user; 085 protected final InetAddress remoteAddress; 086 protected RpcCallback rpcCallback; 087 088 private long responseCellSize = 0; 089 private long responseBlockSize = 0; 090 // cumulative size of serialized exceptions 091 private long exceptionSize = 0; 092 private final boolean retryImmediatelySupported; 093 094 // This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the 095 // second bit is for WAL reference. We can only call release if both of them are zero. The reason 096 // why we can not use a general reference counting is that, we may call cleanup multiple times in 097 // the current implementation. We should fix this in the future. 098 private final AtomicInteger reference = new AtomicInteger(0b01); 099 100 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", 101 justification = "Can't figure why this complaint is happening... see below") 102 ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, 103 Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress, 104 long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator, 105 CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { 106 this.id = id; 107 this.service = service; 108 this.md = md; 109 this.header = header; 110 this.param = param; 111 this.cellScanner = cellScanner; 112 this.connection = connection; 113 this.receiveTime = receiveTime; 114 this.response = null; 115 this.isError = false; 116 this.size = size; 117 if (connection != null) { 118 this.user = connection.user; 119 this.retryImmediatelySupported = connection.retryImmediatelySupported; 120 } else { 121 this.user = null; 122 this.retryImmediatelySupported = false; 123 } 124 this.remoteAddress = remoteAddress; 125 this.timeout = timeout; 126 this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE; 127 this.bbAllocator = byteBuffAllocator; 128 this.cellBlockBuilder = cellBlockBuilder; 129 this.reqCleanup = reqCleanup; 130 } 131 132 /** 133 * Call is done. Execution happened and we returned results to client. It is 134 * now safe to cleanup. 135 */ 136 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 137 justification = "Presume the lock on processing request held by caller is protection enough") 138 @Override 139 public void done() { 140 if (this.cellBlockStream != null) { 141 // This will return back the BBs which we got from pool. 142 this.cellBlockStream.releaseResources(); 143 this.cellBlockStream = null; 144 } 145 // If the call was run successfuly, we might have already returned the BB 146 // back to pool. No worries..Then inputCellBlock will be null 147 cleanup(); 148 } 149 150 private void release(int mask) { 151 for (;;) { 152 int ref = reference.get(); 153 if ((ref & mask) == 0) { 154 return; 155 } 156 int nextRef = ref & (~mask); 157 if (reference.compareAndSet(ref, nextRef)) { 158 if (nextRef == 0) { 159 if (this.reqCleanup != null) { 160 this.reqCleanup.run(); 161 } 162 } 163 return; 164 } 165 } 166 } 167 168 @Override 169 public void cleanup() { 170 release(0b01); 171 } 172 173 public void retainByWAL() { 174 for (;;) { 175 int ref = reference.get(); 176 int nextRef = ref | 0b10; 177 if (reference.compareAndSet(ref, nextRef)) { 178 return; 179 } 180 } 181 } 182 183 public void releaseByWAL() { 184 release(0b10); 185 } 186 187 @Override 188 public String toString() { 189 return toShortString() + " param: " + 190 (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + 191 " connection: " + connection.toString(); 192 } 193 194 @Override 195 public RequestHeader getHeader() { 196 return this.header; 197 } 198 199 @Override 200 public int getPriority() { 201 return this.header.getPriority(); 202 } 203 204 /* 205 * Short string representation without param info because param itself could be huge depends on 206 * the payload of a command 207 */ 208 @Override 209 public String toShortString() { 210 String serviceName = this.connection.service != null ? 211 this.connection.service.getDescriptorForType().getName() : "null"; 212 return "callId: " + this.id + " service: " + serviceName + 213 " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") + 214 " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + 215 " connection: " + connection.toString() + 216 " deadline: " + deadline; 217 } 218 219 @Override 220 public synchronized void setResponse(Message m, final CellScanner cells, 221 Throwable t, String errorMsg) { 222 if (this.isError) return; 223 if (t != null) this.isError = true; 224 BufferChain bc = null; 225 try { 226 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); 227 // Call id. 228 headerBuilder.setCallId(this.id); 229 if (t != null) { 230 setExceptionResponse(t, errorMsg, headerBuilder); 231 } 232 // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the 233 // reservoir when finished. This is hacky and the hack is not contained but benefits are 234 // high when we can avoid a big buffer allocation on each rpc. 235 List<ByteBuffer> cellBlock = null; 236 int cellBlockSize = 0; 237 if (bbAllocator.isReservoirEnabled()) { 238 this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec, 239 this.connection.compressionCodec, cells, bbAllocator); 240 if (this.cellBlockStream != null) { 241 cellBlock = this.cellBlockStream.getByteBuffers(); 242 cellBlockSize = this.cellBlockStream.size(); 243 } 244 } else { 245 ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec, 246 this.connection.compressionCodec, cells); 247 if (b != null) { 248 cellBlockSize = b.remaining(); 249 cellBlock = new ArrayList<>(1); 250 cellBlock.add(b); 251 } 252 } 253 254 if (cellBlockSize > 0) { 255 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); 256 // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. 257 cellBlockBuilder.setLength(cellBlockSize); 258 headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); 259 } 260 Message header = headerBuilder.build(); 261 ByteBuffer headerBuf = 262 createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock); 263 ByteBuffer[] responseBufs = null; 264 int cellBlockBufferSize = 0; 265 if (cellBlock != null) { 266 cellBlockBufferSize = cellBlock.size(); 267 responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; 268 } else { 269 responseBufs = new ByteBuffer[1]; 270 } 271 responseBufs[0] = headerBuf; 272 if (cellBlock != null) { 273 for (int i = 0; i < cellBlockBufferSize; i++) { 274 responseBufs[i + 1] = cellBlock.get(i); 275 } 276 } 277 bc = new BufferChain(responseBufs); 278 if (connection.useWrap) { 279 bc = wrapWithSasl(bc); 280 } 281 } catch (IOException e) { 282 RpcServer.LOG.warn("Exception while creating response " + e); 283 } 284 this.response = bc; 285 // Once a response message is created and set to this.response, this Call can be treated as 286 // done. The Responder thread will do the n/w write of this message back to client. 287 if (this.rpcCallback != null) { 288 try { 289 this.rpcCallback.run(); 290 } catch (Exception e) { 291 // Don't allow any exception here to kill this handler thread. 292 RpcServer.LOG.warn("Exception while running the Rpc Callback.", e); 293 } 294 } 295 } 296 297 static void setExceptionResponse(Throwable t, String errorMsg, 298 ResponseHeader.Builder headerBuilder) { 299 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); 300 exceptionBuilder.setExceptionClassName(t.getClass().getName()); 301 exceptionBuilder.setStackTrace(errorMsg); 302 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException); 303 if (t instanceof RegionMovedException) { 304 // Special casing for this exception. This is only one carrying a payload. 305 // Do this instead of build a generic system for allowing exceptions carry 306 // any kind of payload. 307 RegionMovedException rme = (RegionMovedException)t; 308 exceptionBuilder.setHostname(rme.getHostname()); 309 exceptionBuilder.setPort(rme.getPort()); 310 } 311 // Set the exception as the result of the method invocation. 312 headerBuilder.setException(exceptionBuilder.build()); 313 } 314 315 static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, 316 int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException { 317 // Organize the response as a set of bytebuffers rather than collect it all together inside 318 // one big byte array; save on allocations. 319 // for writing the header, we check if there is available space in the buffers 320 // created for the cellblock itself. If there is space for the header, we reuse 321 // the last buffer in the cellblock. This applies to the cellblock created from the 322 // pool or even the onheap cellblock buffer in case there is no pool enabled. 323 // Possible reuse would avoid creating a temporary array for storing the header every time. 324 ByteBuffer possiblePBBuf = 325 (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; 326 int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, 327 resultVintSize = 0; 328 if (header != null) { 329 headerSerializedSize = header.getSerializedSize(); 330 headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize); 331 } 332 if (result != null) { 333 resultSerializedSize = result.getSerializedSize(); 334 resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize); 335 } 336 // calculate the total size 337 int totalSize = headerSerializedSize + headerVintSize 338 + (resultSerializedSize + resultVintSize) 339 + cellBlockSize; 340 int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize 341 + resultVintSize + Bytes.SIZEOF_INT; 342 // Only if the last buffer has enough space for header use it. Else allocate 343 // a new buffer. Assume they are all flipped 344 if (possiblePBBuf != null 345 && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { 346 // duplicate the buffer. This is where the header is going to be written 347 ByteBuffer pbBuf = possiblePBBuf.duplicate(); 348 // get the current limit 349 int limit = pbBuf.limit(); 350 // Position such that we write the header to the end of the buffer 351 pbBuf.position(limit); 352 // limit to the header size 353 pbBuf.limit(totalPBSize + limit); 354 // mark the current position 355 pbBuf.mark(); 356 writeToCOS(result, header, totalSize, pbBuf); 357 // reset the buffer back to old position 358 pbBuf.reset(); 359 return pbBuf; 360 } else { 361 return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); 362 } 363 } 364 365 private static void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) 366 throws IOException { 367 ByteBufferUtils.putInt(pbBuf, totalSize); 368 // create COS that works on BB 369 CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); 370 if (header != null) { 371 cos.writeMessageNoTag(header); 372 } 373 if (result != null) { 374 cos.writeMessageNoTag(result); 375 } 376 cos.flush(); 377 cos.checkNoSpaceLeft(); 378 } 379 380 private static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, 381 int totalSize, int totalPBSize) throws IOException { 382 ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize); 383 writeToCOS(result, header, totalSize, pbBuf); 384 pbBuf.flip(); 385 return pbBuf; 386 } 387 388 protected BufferChain wrapWithSasl(BufferChain bc) 389 throws IOException { 390 if (!this.connection.useSasl) return bc; 391 // Looks like no way around this; saslserver wants a byte array. I have to make it one. 392 // THIS IS A BIG UGLY COPY. 393 byte [] responseBytes = bc.getBytes(); 394 byte [] token; 395 // synchronization may be needed since there can be multiple Handler 396 // threads using saslServer or Crypto AES to wrap responses. 397 if (connection.useCryptoAesWrap) { 398 // wrap with Crypto AES 399 synchronized (connection.cryptoAES) { 400 token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length); 401 } 402 } else { 403 synchronized (connection.saslServer) { 404 token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); 405 } 406 } 407 if (RpcServer.LOG.isTraceEnabled()) { 408 RpcServer.LOG.trace("Adding saslServer wrapped token of size " + token.length 409 + " as call response."); 410 } 411 412 ByteBuffer[] responseBufs = new ByteBuffer[2]; 413 responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length)); 414 responseBufs[1] = ByteBuffer.wrap(token); 415 return new BufferChain(responseBufs); 416 } 417 418 @Override 419 public long disconnectSince() { 420 if (!this.connection.isConnectionOpen()) { 421 return System.currentTimeMillis() - receiveTime; 422 } else { 423 return -1L; 424 } 425 } 426 427 @Override 428 public boolean isClientCellBlockSupported() { 429 return this.connection != null && this.connection.codec != null; 430 } 431 432 @Override 433 public long getResponseCellSize() { 434 return responseCellSize; 435 } 436 437 @Override 438 public void incrementResponseCellSize(long cellSize) { 439 responseCellSize += cellSize; 440 } 441 442 @Override 443 public long getResponseBlockSize() { 444 return responseBlockSize; 445 } 446 447 @Override 448 public void incrementResponseBlockSize(long blockSize) { 449 responseBlockSize += blockSize; 450 } 451 452 @Override 453 public long getResponseExceptionSize() { 454 return exceptionSize; 455 } 456 @Override 457 public void incrementResponseExceptionSize(long exSize) { 458 exceptionSize += exSize; 459 } 460 461 @Override 462 public long getSize() { 463 return this.size; 464 } 465 466 @Override 467 public long getDeadline() { 468 return deadline; 469 } 470 471 @Override 472 public Optional<User> getRequestUser() { 473 return Optional.ofNullable(user); 474 } 475 476 @Override 477 public InetAddress getRemoteAddress() { 478 return remoteAddress; 479 } 480 481 @Override 482 public VersionInfo getClientVersionInfo() { 483 return connection.getVersionInfo(); 484 } 485 486 @Override 487 public synchronized void setCallBack(RpcCallback callback) { 488 this.rpcCallback = callback; 489 } 490 491 @Override 492 public boolean isRetryImmediatelySupported() { 493 return retryImmediatelySupported; 494 } 495 496 @Override 497 public BlockingService getService() { 498 return service; 499 } 500 501 @Override 502 public MethodDescriptor getMethod() { 503 return md; 504 } 505 506 @Override 507 public Message getParam() { 508 return param; 509 } 510 511 @Override 512 public CellScanner getCellScanner() { 513 return cellScanner; 514 } 515 516 @Override 517 public long getReceiveTime() { 518 return receiveTime; 519 } 520 521 @Override 522 public long getStartTime() { 523 return startTime; 524 } 525 526 @Override 527 public void setStartTime(long t) { 528 this.startTime = t; 529 } 530 531 @Override 532 public int getTimeout() { 533 return timeout; 534 } 535 536 @Override 537 public int getRemotePort() { 538 return connection.getRemotePort(); 539 } 540 541 @Override 542 public synchronized BufferChain getResponse() { 543 return response; 544 } 545}