001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.api.trace.StatusCode; 022import io.opentelemetry.context.Scope; 023import java.io.IOException; 024import java.net.InetAddress; 025import java.nio.ByteBuffer; 026import java.security.cert.X509Certificate; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.concurrent.atomic.AtomicInteger; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.ExtendedCellScanner; 035import org.apache.hadoop.hbase.HBaseServerException; 036import org.apache.hadoop.hbase.exceptions.RegionMovedException; 037import org.apache.hadoop.hbase.io.ByteBuffAllocator; 038import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; 039import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.trace.TraceUtil; 042import org.apache.hadoop.hbase.util.ByteBufferUtils; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.util.StringUtils; 046import org.apache.yetus.audience.InterfaceAudience; 047 048import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 050import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 051import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 052import org.apache.hbase.thirdparty.com.google.protobuf.Message; 053 054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 061 062/** 063 * Datastructure that holds all necessary to a method invocation and then afterward, carries the 064 * result. 065 */ 066@InterfaceAudience.Private 067public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse { 068 069 protected final int id; // the client's call id 070 protected final BlockingService service; 071 protected final MethodDescriptor md; 072 protected final RequestHeader header; 073 protected Message param; // the parameter passed 074 // Optional cell data passed outside of protobufs. 075 protected final ExtendedCellScanner cellScanner; 076 protected final T connection; // connection to client 077 protected final long receiveTime; // the time received when response is null 078 // the time served when response is not null 079 protected final int timeout; 080 protected long startTime; 081 protected final long deadline;// the deadline to handle this call, if exceed we can drop it. 082 083 protected final ByteBuffAllocator bbAllocator; 084 085 protected final CellBlockBuilder cellBlockBuilder; 086 087 /** 088 * Chain of buffers to send as response. 089 */ 090 protected BufferChain response; 091 092 protected final long size; // size of current call 093 protected boolean isError; 094 protected ByteBufferListOutputStream cellBlockStream = null; 095 protected CallCleanup reqCleanup = null; 096 097 protected final User user; 098 protected final InetAddress remoteAddress; 099 protected final X509Certificate[] clientCertificateChain; 100 protected RpcCallback rpcCallback; 101 102 private long responseCellSize = 0; 103 private long responseBlockSize = 0; 104 private long fsReadTimeMillis = 0; 105 // cumulative size of serialized exceptions 106 private long exceptionSize = 0; 107 private final boolean retryImmediatelySupported; 108 private volatile Map<String, byte[]> requestAttributes; 109 110 // This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and 111 // the rest of the bits are for WAL reference count. We can only call release if all of them are 112 // zero. The reason why we can not use a general reference counting is that, we may call cleanup 113 // multiple times in the current implementation. We should fix this in the future. 114 // The refCount here will start as 0x80000000 and increment with every WAL reference and decrement 115 // from WAL side on release 116 private final AtomicInteger reference = new AtomicInteger(0x80000000); 117 118 private final Span span; 119 120 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", 121 justification = "Can't figure why this complaint is happening... see below") 122 ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, 123 Message param, ExtendedCellScanner cellScanner, T connection, long size, 124 InetAddress remoteAddress, long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator, 125 CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { 126 this.id = id; 127 this.service = service; 128 this.md = md; 129 this.header = header; 130 this.param = param; 131 this.cellScanner = cellScanner; 132 this.connection = connection; 133 this.receiveTime = receiveTime; 134 this.response = null; 135 this.isError = false; 136 this.size = size; 137 if (connection != null) { 138 this.user = connection.user; 139 this.retryImmediatelySupported = connection.retryImmediatelySupported; 140 this.clientCertificateChain = connection.clientCertificateChain; 141 } else { 142 this.user = null; 143 this.retryImmediatelySupported = false; 144 this.clientCertificateChain = null; 145 } 146 this.remoteAddress = remoteAddress; 147 this.timeout = timeout; 148 this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE; 149 this.bbAllocator = byteBuffAllocator; 150 this.cellBlockBuilder = cellBlockBuilder; 151 this.reqCleanup = reqCleanup; 152 this.span = Span.current(); 153 } 154 155 /** 156 * Call is done. Execution happened and we returned results to client. It is now safe to cleanup. 157 */ 158 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 159 justification = "Presume the lock on processing request held by caller is protection enough") 160 @Override 161 public void done() { 162 if (this.cellBlockStream != null) { 163 // This will return back the BBs which we got from pool. 164 this.cellBlockStream.releaseResources(); 165 this.cellBlockStream = null; 166 } 167 // If the call was run successfuly, we might have already returned the BB 168 // back to pool. No worries..Then inputCellBlock will be null 169 cleanup(); 170 span.end(); 171 } 172 173 @Override 174 public void cleanup() { 175 for (;;) { 176 int ref = reference.get(); 177 if ((ref & 0x80000000) == 0) { 178 return; 179 } 180 int nextRef = ref & 0x7fffffff; 181 if (reference.compareAndSet(ref, nextRef)) { 182 if (nextRef == 0) { 183 if (this.reqCleanup != null) { 184 this.reqCleanup.run(); 185 } 186 } 187 return; 188 } 189 } 190 } 191 192 public void retainByWAL() { 193 reference.incrementAndGet(); 194 } 195 196 public void releaseByWAL() { 197 // Here this method of decrementAndGet for releasing WAL reference count will work in both 198 // cases - i.e. highest bit (cleanup) 1 or 0. We will be decrementing a negative or positive 199 // value respectively in these 2 cases, but the logic will work the same way 200 if (reference.decrementAndGet() == 0) { 201 if (this.reqCleanup != null) { 202 this.reqCleanup.run(); 203 } 204 } 205 206 } 207 208 @Override 209 public String toString() { 210 return toShortString() + " param: " 211 + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + " connection: " 212 + connection.toString(); 213 } 214 215 @Override 216 public RequestHeader getHeader() { 217 return this.header; 218 } 219 220 @Override 221 public Map<String, byte[]> getConnectionAttributes() { 222 return this.connection.connectionAttributes; 223 } 224 225 @Override 226 public Map<String, byte[]> getRequestAttributes() { 227 if (this.requestAttributes == null) { 228 if (header.getAttributeList().isEmpty()) { 229 this.requestAttributes = Collections.emptyMap(); 230 } else { 231 Map<String, byte[]> requestAttributes = 232 Maps.newHashMapWithExpectedSize(header.getAttributeList().size()); 233 for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) { 234 requestAttributes.put(nameBytesPair.getName(), nameBytesPair.getValue().toByteArray()); 235 } 236 this.requestAttributes = requestAttributes; 237 } 238 } 239 return this.requestAttributes; 240 } 241 242 @Override 243 public byte[] getRequestAttribute(String key) { 244 if (this.requestAttributes == null) { 245 for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) { 246 if (nameBytesPair.getName().equals(key)) { 247 return nameBytesPair.getValue().toByteArray(); 248 } 249 } 250 return null; 251 } 252 return this.requestAttributes.get(key); 253 } 254 255 @Override 256 public int getPriority() { 257 return this.header.getPriority(); 258 } 259 260 /* 261 * Short string representation without param info because param itself could be huge depends on 262 * the payload of a command 263 */ 264 @Override 265 public String toShortString() { 266 String serviceName = this.connection.service != null 267 ? this.connection.service.getDescriptorForType().getName() 268 : "null"; 269 return "callId: " + this.id + " service: " + serviceName + " methodName: " 270 + ((this.md != null) ? this.md.getName() : "n/a") + " size: " 271 + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + " connection: " 272 + connection + " deadline: " + deadline; 273 } 274 275 @Override 276 public synchronized void setResponse(Message m, final ExtendedCellScanner cells, Throwable t, 277 String errorMsg) { 278 if (this.isError) { 279 return; 280 } 281 if (t != null) { 282 this.isError = true; 283 TraceUtil.setError(span, t); 284 } else { 285 span.setStatus(StatusCode.OK); 286 } 287 BufferChain bc = null; 288 try { 289 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); 290 // Call id. 291 headerBuilder.setCallId(this.id); 292 if (t != null) { 293 setExceptionResponse(t, errorMsg, headerBuilder); 294 } 295 // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the 296 // reservoir when finished. This is hacky and the hack is not contained but benefits are 297 // high when we can avoid a big buffer allocation on each rpc. 298 List<ByteBuffer> cellBlock = null; 299 int cellBlockSize = 0; 300 if (bbAllocator.isReservoirEnabled()) { 301 this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec, 302 this.connection.compressionCodec, cells, bbAllocator); 303 if (this.cellBlockStream != null) { 304 cellBlock = this.cellBlockStream.getByteBuffers(); 305 cellBlockSize = this.cellBlockStream.size(); 306 } 307 } else { 308 ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec, 309 this.connection.compressionCodec, cells); 310 if (b != null) { 311 cellBlockSize = b.remaining(); 312 cellBlock = new ArrayList<>(1); 313 cellBlock.add(b); 314 } 315 } 316 317 if (cellBlockSize > 0) { 318 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); 319 // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. 320 cellBlockBuilder.setLength(cellBlockSize); 321 headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); 322 } 323 Message header = headerBuilder.build(); 324 ByteBuffer headerBuf = createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock); 325 ByteBuffer[] responseBufs = null; 326 int cellBlockBufferSize = 0; 327 if (cellBlock != null) { 328 cellBlockBufferSize = cellBlock.size(); 329 responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; 330 } else { 331 responseBufs = new ByteBuffer[1]; 332 } 333 responseBufs[0] = headerBuf; 334 if (cellBlock != null) { 335 for (int i = 0; i < cellBlockBufferSize; i++) { 336 responseBufs[i + 1] = cellBlock.get(i); 337 } 338 } 339 bc = new BufferChain(responseBufs); 340 } catch (IOException e) { 341 RpcServer.LOG.warn("Exception while creating response " + e); 342 } 343 this.response = bc; 344 // Once a response message is created and set to this.response, this Call can be treated as 345 // done. The Responder thread will do the n/w write of this message back to client. 346 if (this.rpcCallback != null) { 347 try (Scope ignored = span.makeCurrent()) { 348 this.rpcCallback.run(); 349 } catch (Exception e) { 350 // Don't allow any exception here to kill this handler thread. 351 RpcServer.LOG.warn("Exception while running the Rpc Callback.", e); 352 TraceUtil.setError(span, e); 353 } 354 } 355 } 356 357 static void setExceptionResponse(Throwable t, String errorMsg, 358 ResponseHeader.Builder headerBuilder) { 359 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); 360 exceptionBuilder.setExceptionClassName(t.getClass().getName()); 361 exceptionBuilder.setStackTrace(errorMsg); 362 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException); 363 if (t instanceof RegionMovedException) { 364 // Special casing for this exception. This is only one carrying a payload. 365 // Do this instead of build a generic system for allowing exceptions carry 366 // any kind of payload. 367 RegionMovedException rme = (RegionMovedException) t; 368 exceptionBuilder.setHostname(rme.getHostname()); 369 exceptionBuilder.setPort(rme.getPort()); 370 } else if (t instanceof HBaseServerException) { 371 HBaseServerException hse = (HBaseServerException) t; 372 exceptionBuilder.setServerOverloaded(hse.isServerOverloaded()); 373 } 374 // Set the exception as the result of the method invocation. 375 headerBuilder.setException(exceptionBuilder.build()); 376 } 377 378 static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize, 379 List<ByteBuffer> cellBlock) throws IOException { 380 // Organize the response as a set of bytebuffers rather than collect it all together inside 381 // one big byte array; save on allocations. 382 // for writing the header, we check if there is available space in the buffers 383 // created for the cellblock itself. If there is space for the header, we reuse 384 // the last buffer in the cellblock. This applies to the cellblock created from the 385 // pool or even the onheap cellblock buffer in case there is no pool enabled. 386 // Possible reuse would avoid creating a temporary array for storing the header every time. 387 ByteBuffer possiblePBBuf = (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; 388 int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, resultVintSize = 0; 389 if (header != null) { 390 headerSerializedSize = header.getSerializedSize(); 391 headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize); 392 } 393 if (result != null) { 394 resultSerializedSize = result.getSerializedSize(); 395 resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize); 396 } 397 // calculate the total size 398 int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize) 399 + cellBlockSize; 400 int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + resultVintSize 401 + Bytes.SIZEOF_INT; 402 // Only if the last buffer has enough space for header use it. Else allocate 403 // a new buffer. Assume they are all flipped 404 if (possiblePBBuf != null && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { 405 // duplicate the buffer. This is where the header is going to be written 406 ByteBuffer pbBuf = possiblePBBuf.duplicate(); 407 // get the current limit 408 int limit = pbBuf.limit(); 409 // Position such that we write the header to the end of the buffer 410 pbBuf.position(limit); 411 // limit to the header size 412 pbBuf.limit(totalPBSize + limit); 413 // mark the current position 414 pbBuf.mark(); 415 writeToCOS(result, header, totalSize, pbBuf); 416 // reset the buffer back to old position 417 pbBuf.reset(); 418 return pbBuf; 419 } else { 420 return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); 421 } 422 } 423 424 private static void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) 425 throws IOException { 426 ByteBufferUtils.putInt(pbBuf, totalSize); 427 // create COS that works on BB 428 CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); 429 if (header != null) { 430 cos.writeMessageNoTag(header); 431 } 432 if (result != null) { 433 cos.writeMessageNoTag(result); 434 } 435 cos.flush(); 436 cos.checkNoSpaceLeft(); 437 } 438 439 private static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, 440 int totalSize, int totalPBSize) throws IOException { 441 ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize); 442 writeToCOS(result, header, totalSize, pbBuf); 443 pbBuf.flip(); 444 return pbBuf; 445 } 446 447 @Override 448 public long disconnectSince() { 449 if (!this.connection.isConnectionOpen()) { 450 return EnvironmentEdgeManager.currentTime() - receiveTime; 451 } else { 452 return -1L; 453 } 454 } 455 456 @Override 457 public boolean isClientCellBlockSupported() { 458 return this.connection != null && this.connection.codec != null; 459 } 460 461 @Override 462 public long getResponseCellSize() { 463 return responseCellSize; 464 } 465 466 @Override 467 public void incrementResponseCellSize(long cellSize) { 468 responseCellSize += cellSize; 469 } 470 471 @Override 472 public long getBlockBytesScanned() { 473 return responseBlockSize; 474 } 475 476 @Override 477 public void incrementBlockBytesScanned(long blockSize) { 478 responseBlockSize += blockSize; 479 } 480 481 @Override 482 public long getResponseExceptionSize() { 483 return exceptionSize; 484 } 485 486 @Override 487 public void incrementResponseExceptionSize(long exSize) { 488 exceptionSize += exSize; 489 } 490 491 @Override 492 public long getSize() { 493 return this.size; 494 } 495 496 @Override 497 public long getDeadline() { 498 return deadline; 499 } 500 501 @Override 502 public Optional<User> getRequestUser() { 503 return Optional.ofNullable(user); 504 } 505 506 @Override 507 public Optional<X509Certificate[]> getClientCertificateChain() { 508 return Optional.ofNullable(clientCertificateChain); 509 } 510 511 @Override 512 public InetAddress getRemoteAddress() { 513 return remoteAddress; 514 } 515 516 @Override 517 public VersionInfo getClientVersionInfo() { 518 return connection.getVersionInfo(); 519 } 520 521 @Override 522 public synchronized void setCallBack(RpcCallback callback) { 523 this.rpcCallback = callback; 524 } 525 526 @Override 527 public boolean isRetryImmediatelySupported() { 528 return retryImmediatelySupported; 529 } 530 531 @Override 532 public BlockingService getService() { 533 return service; 534 } 535 536 @Override 537 public MethodDescriptor getMethod() { 538 return md; 539 } 540 541 @Override 542 public Message getParam() { 543 return param; 544 } 545 546 @Override 547 public ExtendedCellScanner getCellScanner() { 548 return cellScanner; 549 } 550 551 @Override 552 public long getReceiveTime() { 553 return receiveTime; 554 } 555 556 @Override 557 public long getStartTime() { 558 return startTime; 559 } 560 561 @Override 562 public void setStartTime(long t) { 563 this.startTime = t; 564 } 565 566 @Override 567 public int getTimeout() { 568 return timeout; 569 } 570 571 @Override 572 public int getRemotePort() { 573 return connection.getRemotePort(); 574 } 575 576 @Override 577 public synchronized BufferChain getResponse() { 578 return response; 579 } 580 581 @Override 582 public void updateFsReadTime(long latencyMillis) { 583 fsReadTimeMillis += latencyMillis; 584 } 585 586 @Override 587 public long getFsReadTime() { 588 return fsReadTimeMillis; 589 } 590}