001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import io.opentelemetry.api.trace.Span; 022import io.opentelemetry.api.trace.StatusCode; 023import io.opentelemetry.context.Scope; 024import java.io.IOException; 025import java.net.InetAddress; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Optional; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.hbase.CellScanner; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HBaseServerException; 034import org.apache.hadoop.hbase.exceptions.RegionMovedException; 035import org.apache.hadoop.hbase.io.ByteBuffAllocator; 036import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; 037import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 038import org.apache.hadoop.hbase.security.User; 039import org.apache.hadoop.hbase.trace.TraceUtil; 040import org.apache.hadoop.hbase.util.ByteBufferUtils; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.apache.hadoop.util.StringUtils; 044import org.apache.yetus.audience.InterfaceAudience; 045 046import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 047import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 048import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 049import org.apache.hbase.thirdparty.com.google.protobuf.Message; 050 051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 057 058/** 059 * Datastructure that holds all necessary to a method invocation and then afterward, carries the 060 * result. 061 */ 062@InterfaceAudience.Private 063public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse { 064 065 protected final int id; // the client's call id 066 protected final BlockingService service; 067 protected final MethodDescriptor md; 068 protected final RequestHeader header; 069 protected Message param; // the parameter passed 070 // Optional cell data passed outside of protobufs. 071 protected final CellScanner cellScanner; 072 protected final T connection; // connection to client 073 protected final long receiveTime; // the time received when response is null 074 // the time served when response is not null 075 protected final int timeout; 076 protected long startTime; 077 protected final long deadline;// the deadline to handle this call, if exceed we can drop it. 078 079 protected final ByteBuffAllocator bbAllocator; 080 081 protected final CellBlockBuilder cellBlockBuilder; 082 083 /** 084 * Chain of buffers to send as response. 085 */ 086 protected BufferChain response; 087 088 protected final long size; // size of current call 089 protected boolean isError; 090 protected ByteBufferListOutputStream cellBlockStream = null; 091 protected CallCleanup reqCleanup = null; 092 093 protected final User user; 094 protected final InetAddress remoteAddress; 095 protected RpcCallback rpcCallback; 096 097 private long responseCellSize = 0; 098 private long responseBlockSize = 0; 099 // cumulative size of serialized exceptions 100 private long exceptionSize = 0; 101 private final boolean retryImmediatelySupported; 102 103 // This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and 104 // the rest of the bits are for WAL reference count. We can only call release if all of them are 105 // zero. The reason why we can not use a general reference counting is that, we may call cleanup 106 // multiple times in the current implementation. We should fix this in the future. 107 // The refCount here will start as 0x80000000 and increment with every WAL reference and decrement 108 // from WAL side on release 109 private final AtomicInteger reference = new AtomicInteger(0x80000000); 110 111 private final Span span; 112 113 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", 114 justification = "Can't figure why this complaint is happening... see below") 115 ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, 116 Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress, 117 long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator, 118 CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { 119 this.id = id; 120 this.service = service; 121 this.md = md; 122 this.header = header; 123 this.param = param; 124 this.cellScanner = cellScanner; 125 this.connection = connection; 126 this.receiveTime = receiveTime; 127 this.response = null; 128 this.isError = false; 129 this.size = size; 130 if (connection != null) { 131 this.user = connection.user; 132 this.retryImmediatelySupported = connection.retryImmediatelySupported; 133 } else { 134 this.user = null; 135 this.retryImmediatelySupported = false; 136 } 137 this.remoteAddress = remoteAddress; 138 this.timeout = timeout; 139 this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE; 140 this.bbAllocator = byteBuffAllocator; 141 this.cellBlockBuilder = cellBlockBuilder; 142 this.reqCleanup = reqCleanup; 143 this.span = Span.current(); 144 } 145 146 /** 147 * Call is done. Execution happened and we returned results to client. It is now safe to cleanup. 148 */ 149 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 150 justification = "Presume the lock on processing request held by caller is protection enough") 151 @Override 152 public void done() { 153 if (this.cellBlockStream != null) { 154 // This will return back the BBs which we got from pool. 155 this.cellBlockStream.releaseResources(); 156 this.cellBlockStream = null; 157 } 158 // If the call was run successfuly, we might have already returned the BB 159 // back to pool. No worries..Then inputCellBlock will be null 160 cleanup(); 161 span.end(); 162 } 163 164 @Override 165 public void cleanup() { 166 for (;;) { 167 int ref = reference.get(); 168 if ((ref & 0x80000000) == 0) { 169 return; 170 } 171 int nextRef = ref & 0x7fffffff; 172 if (reference.compareAndSet(ref, nextRef)) { 173 if (nextRef == 0) { 174 if (this.reqCleanup != null) { 175 this.reqCleanup.run(); 176 } 177 } 178 return; 179 } 180 } 181 } 182 183 public void retainByWAL() { 184 reference.incrementAndGet(); 185 } 186 187 public void releaseByWAL() { 188 // Here this method of decrementAndGet for releasing WAL reference count will work in both 189 // cases - i.e. highest bit (cleanup) 1 or 0. We will be decrementing a negative or positive 190 // value respectively in these 2 cases, but the logic will work the same way 191 if (reference.decrementAndGet() == 0) { 192 if (this.reqCleanup != null) { 193 this.reqCleanup.run(); 194 } 195 } 196 } 197 198 @Override 199 public String toString() { 200 return toShortString() + " param: " 201 + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + " connection: " 202 + connection.toString(); 203 } 204 205 @Override 206 public RequestHeader getHeader() { 207 return this.header; 208 } 209 210 @Override 211 public int getPriority() { 212 return this.header.getPriority(); 213 } 214 215 /* 216 * Short string representation without param info because param itself could be huge depends on 217 * the payload of a command 218 */ 219 @Override 220 public String toShortString() { 221 String serviceName = this.connection.service != null 222 ? this.connection.service.getDescriptorForType().getName() 223 : "null"; 224 return "callId: " + this.id + " service: " + serviceName + " methodName: " 225 + ((this.md != null) ? this.md.getName() : "n/a") + " size: " 226 + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + " connection: " 227 + connection + " deadline: " + deadline; 228 } 229 230 @Override 231 public synchronized void setResponse(Message m, final CellScanner cells, Throwable t, 232 String errorMsg) { 233 if (this.isError) { 234 return; 235 } 236 if (t != null) { 237 this.isError = true; 238 TraceUtil.setError(span, t); 239 } else { 240 span.setStatus(StatusCode.OK); 241 } 242 BufferChain bc = null; 243 try { 244 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); 245 // Call id. 246 headerBuilder.setCallId(this.id); 247 if (t != null) { 248 setExceptionResponse(t, errorMsg, headerBuilder); 249 } 250 // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the 251 // reservoir when finished. This is hacky and the hack is not contained but benefits are 252 // high when we can avoid a big buffer allocation on each rpc. 253 List<ByteBuffer> cellBlock = null; 254 int cellBlockSize = 0; 255 if (bbAllocator.isReservoirEnabled()) { 256 this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec, 257 this.connection.compressionCodec, cells, bbAllocator); 258 if (this.cellBlockStream != null) { 259 cellBlock = this.cellBlockStream.getByteBuffers(); 260 cellBlockSize = this.cellBlockStream.size(); 261 } 262 } else { 263 ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec, 264 this.connection.compressionCodec, cells); 265 if (b != null) { 266 cellBlockSize = b.remaining(); 267 cellBlock = new ArrayList<>(1); 268 cellBlock.add(b); 269 } 270 } 271 272 if (cellBlockSize > 0) { 273 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); 274 // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. 275 cellBlockBuilder.setLength(cellBlockSize); 276 headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); 277 } 278 Message header = headerBuilder.build(); 279 ByteBuffer headerBuf = createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock); 280 ByteBuffer[] responseBufs = null; 281 int cellBlockBufferSize = 0; 282 if (cellBlock != null) { 283 cellBlockBufferSize = cellBlock.size(); 284 responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; 285 } else { 286 responseBufs = new ByteBuffer[1]; 287 } 288 responseBufs[0] = headerBuf; 289 if (cellBlock != null) { 290 for (int i = 0; i < cellBlockBufferSize; i++) { 291 responseBufs[i + 1] = cellBlock.get(i); 292 } 293 } 294 bc = new BufferChain(responseBufs); 295 } catch (IOException e) { 296 RpcServer.LOG.warn("Exception while creating response " + e); 297 } 298 this.response = bc; 299 // Once a response message is created and set to this.response, this Call can be treated as 300 // done. The Responder thread will do the n/w write of this message back to client. 301 if (this.rpcCallback != null) { 302 try (Scope ignored = span.makeCurrent()) { 303 this.rpcCallback.run(); 304 } catch (Exception e) { 305 // Don't allow any exception here to kill this handler thread. 306 RpcServer.LOG.warn("Exception while running the Rpc Callback.", e); 307 TraceUtil.setError(span, e); 308 } 309 } 310 } 311 312 static void setExceptionResponse(Throwable t, String errorMsg, 313 ResponseHeader.Builder headerBuilder) { 314 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); 315 exceptionBuilder.setExceptionClassName(t.getClass().getName()); 316 exceptionBuilder.setStackTrace(errorMsg); 317 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException); 318 if (t instanceof RegionMovedException) { 319 // Special casing for this exception. This is only one carrying a payload. 320 // Do this instead of build a generic system for allowing exceptions carry 321 // any kind of payload. 322 RegionMovedException rme = (RegionMovedException) t; 323 exceptionBuilder.setHostname(rme.getHostname()); 324 exceptionBuilder.setPort(rme.getPort()); 325 } else if (t instanceof HBaseServerException) { 326 HBaseServerException hse = (HBaseServerException) t; 327 exceptionBuilder.setServerOverloaded(hse.isServerOverloaded()); 328 } 329 // Set the exception as the result of the method invocation. 330 headerBuilder.setException(exceptionBuilder.build()); 331 } 332 333 static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize, 334 List<ByteBuffer> cellBlock) throws IOException { 335 // Organize the response as a set of bytebuffers rather than collect it all together inside 336 // one big byte array; save on allocations. 337 // for writing the header, we check if there is available space in the buffers 338 // created for the cellblock itself. If there is space for the header, we reuse 339 // the last buffer in the cellblock. This applies to the cellblock created from the 340 // pool or even the onheap cellblock buffer in case there is no pool enabled. 341 // Possible reuse would avoid creating a temporary array for storing the header every time. 342 ByteBuffer possiblePBBuf = (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; 343 int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, resultVintSize = 0; 344 if (header != null) { 345 headerSerializedSize = header.getSerializedSize(); 346 headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize); 347 } 348 if (result != null) { 349 resultSerializedSize = result.getSerializedSize(); 350 resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize); 351 } 352 // calculate the total size 353 int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize) 354 + cellBlockSize; 355 int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + resultVintSize 356 + Bytes.SIZEOF_INT; 357 // Only if the last buffer has enough space for header use it. Else allocate 358 // a new buffer. Assume they are all flipped 359 if (possiblePBBuf != null && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { 360 // duplicate the buffer. This is where the header is going to be written 361 ByteBuffer pbBuf = possiblePBBuf.duplicate(); 362 // get the current limit 363 int limit = pbBuf.limit(); 364 // Position such that we write the header to the end of the buffer 365 pbBuf.position(limit); 366 // limit to the header size 367 pbBuf.limit(totalPBSize + limit); 368 // mark the current position 369 pbBuf.mark(); 370 writeToCOS(result, header, totalSize, pbBuf); 371 // reset the buffer back to old position 372 pbBuf.reset(); 373 return pbBuf; 374 } else { 375 return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); 376 } 377 } 378 379 private static void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) 380 throws IOException { 381 ByteBufferUtils.putInt(pbBuf, totalSize); 382 // create COS that works on BB 383 CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); 384 if (header != null) { 385 cos.writeMessageNoTag(header); 386 } 387 if (result != null) { 388 cos.writeMessageNoTag(result); 389 } 390 cos.flush(); 391 cos.checkNoSpaceLeft(); 392 } 393 394 private static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, 395 int totalSize, int totalPBSize) throws IOException { 396 ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize); 397 writeToCOS(result, header, totalSize, pbBuf); 398 pbBuf.flip(); 399 return pbBuf; 400 } 401 402 protected BufferChain wrapWithSasl(BufferChain bc) throws IOException { 403 if (!this.connection.useSasl) { 404 return bc; 405 } 406 // Looks like no way around this; saslserver wants a byte array. I have to make it one. 407 // THIS IS A BIG UGLY COPY. 408 byte[] responseBytes = bc.getBytes(); 409 byte[] token; 410 // synchronization may be needed since there can be multiple Handler 411 // threads using saslServer or Crypto AES to wrap responses. 412 if (connection.useCryptoAesWrap) { 413 // wrap with Crypto AES 414 synchronized (connection.cryptoAES) { 415 token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length); 416 } 417 } else { 418 synchronized (connection.saslServer) { 419 token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); 420 } 421 } 422 if (RpcServer.LOG.isTraceEnabled()) { 423 RpcServer.LOG 424 .trace("Adding saslServer wrapped token of size " + token.length + " as call response."); 425 } 426 427 ByteBuffer[] responseBufs = new ByteBuffer[2]; 428 responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length)); 429 responseBufs[1] = ByteBuffer.wrap(token); 430 return new BufferChain(responseBufs); 431 } 432 433 @Override 434 public long disconnectSince() { 435 if (!this.connection.isConnectionOpen()) { 436 return EnvironmentEdgeManager.currentTime() - receiveTime; 437 } else { 438 return -1L; 439 } 440 } 441 442 @Override 443 public boolean isClientCellBlockSupported() { 444 return this.connection != null && this.connection.codec != null; 445 } 446 447 @Override 448 public long getResponseCellSize() { 449 return responseCellSize; 450 } 451 452 @Override 453 public void incrementResponseCellSize(long cellSize) { 454 responseCellSize += cellSize; 455 } 456 457 @Override 458 public long getResponseBlockSize() { 459 return responseBlockSize; 460 } 461 462 @Override 463 public void incrementResponseBlockSize(long blockSize) { 464 responseBlockSize += blockSize; 465 } 466 467 @Override 468 public long getResponseExceptionSize() { 469 return exceptionSize; 470 } 471 472 @Override 473 public void incrementResponseExceptionSize(long exSize) { 474 exceptionSize += exSize; 475 } 476 477 @Override 478 public long getSize() { 479 return this.size; 480 } 481 482 @Override 483 public long getDeadline() { 484 return deadline; 485 } 486 487 @Override 488 public Optional<User> getRequestUser() { 489 return Optional.ofNullable(user); 490 } 491 492 @Override 493 public InetAddress getRemoteAddress() { 494 return remoteAddress; 495 } 496 497 @Override 498 public VersionInfo getClientVersionInfo() { 499 return connection.getVersionInfo(); 500 } 501 502 @Override 503 public synchronized void setCallBack(RpcCallback callback) { 504 this.rpcCallback = callback; 505 } 506 507 @Override 508 public boolean isRetryImmediatelySupported() { 509 return retryImmediatelySupported; 510 } 511 512 @Override 513 public BlockingService getService() { 514 return service; 515 } 516 517 @Override 518 public MethodDescriptor getMethod() { 519 return md; 520 } 521 522 @Override 523 public Message getParam() { 524 return param; 525 } 526 527 @Override 528 public CellScanner getCellScanner() { 529 return cellScanner; 530 } 531 532 @Override 533 public long getReceiveTime() { 534 return receiveTime; 535 } 536 537 @Override 538 public long getStartTime() { 539 return startTime; 540 } 541 542 @Override 543 public void setStartTime(long t) { 544 this.startTime = t; 545 } 546 547 @Override 548 public int getTimeout() { 549 return timeout; 550 } 551 552 @Override 553 public int getRemotePort() { 554 return connection.getRemotePort(); 555 } 556 557 @Override 558 public synchronized BufferChain getResponse() { 559 if (connection.useWrap) { 560 /* 561 * wrapping result with SASL as the last step just before sending it out, so every message 562 * must have the right increasing sequence number 563 */ 564 try { 565 return wrapWithSasl(response); 566 } catch (IOException e) { 567 /* it is exactly the same what setResponse() does */ 568 RpcServer.LOG.warn("Exception while creating response " + e); 569 return null; 570 } 571 } else { 572 return response; 573 } 574 } 575 576 @RestrictedApi(explanation = "Should only be called in tests", link = "", 577 allowedOnPath = ".*/src/test/.*") 578 public synchronized RpcCallback getCallBack() { 579 return this.rpcCallback; 580 } 581}