001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.api.trace.StatusCode; 022import io.opentelemetry.context.Scope; 023import java.io.IOException; 024import java.net.InetAddress; 025import java.nio.ByteBuffer; 026import java.security.cert.X509Certificate; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import java.util.Map; 031import java.util.Optional; 032import java.util.concurrent.atomic.AtomicInteger; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.ExtendedCellScanner; 035import org.apache.hadoop.hbase.HBaseServerException; 036import org.apache.hadoop.hbase.exceptions.RegionMovedException; 037import org.apache.hadoop.hbase.io.ByteBuffAllocator; 038import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; 039import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.trace.TraceUtil; 042import org.apache.hadoop.hbase.util.ByteBufferUtils; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.util.StringUtils; 046import org.apache.yetus.audience.InterfaceAudience; 047 048import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 050import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 051import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 052import org.apache.hbase.thirdparty.com.google.protobuf.Message; 053 054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 061 062/** 063 * Datastructure that holds all necessary to a method invocation and then afterward, carries the 064 * result. 065 */ 066@InterfaceAudience.Private 067public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse { 068 069 protected final int id; // the client's call id 070 protected final BlockingService service; 071 protected final MethodDescriptor md; 072 protected final RequestHeader header; 073 protected Message param; // the parameter passed 074 // Optional cell data passed outside of protobufs. 075 protected final ExtendedCellScanner cellScanner; 076 protected final T connection; // connection to client 077 protected final long receiveTime; // the time received when response is null 078 // the time served when response is not null 079 protected final int timeout; 080 protected long startTime; 081 protected final long deadline;// the deadline to handle this call, if exceed we can drop it. 082 083 protected final ByteBuffAllocator bbAllocator; 084 085 protected final CellBlockBuilder cellBlockBuilder; 086 087 /** 088 * Chain of buffers to send as response. 089 */ 090 protected BufferChain response; 091 092 protected final long size; // size of current call 093 protected boolean isError; 094 protected ByteBufferListOutputStream cellBlockStream = null; 095 protected CallCleanup reqCleanup = null; 096 097 protected final User user; 098 protected final InetAddress remoteAddress; 099 protected final X509Certificate[] clientCertificateChain; 100 protected RpcCallback rpcCallback; 101 102 private long responseCellSize = 0; 103 private long responseBlockSize = 0; 104 // cumulative size of serialized exceptions 105 private long exceptionSize = 0; 106 private final boolean retryImmediatelySupported; 107 private volatile Map<String, byte[]> requestAttributes; 108 109 // This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and 110 // the rest of the bits are for WAL reference count. We can only call release if all of them are 111 // zero. The reason why we can not use a general reference counting is that, we may call cleanup 112 // multiple times in the current implementation. We should fix this in the future. 113 // The refCount here will start as 0x80000000 and increment with every WAL reference and decrement 114 // from WAL side on release 115 private final AtomicInteger reference = new AtomicInteger(0x80000000); 116 117 private final Span span; 118 119 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", 120 justification = "Can't figure why this complaint is happening... see below") 121 ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, 122 Message param, ExtendedCellScanner cellScanner, T connection, long size, 123 InetAddress remoteAddress, long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator, 124 CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { 125 this.id = id; 126 this.service = service; 127 this.md = md; 128 this.header = header; 129 this.param = param; 130 this.cellScanner = cellScanner; 131 this.connection = connection; 132 this.receiveTime = receiveTime; 133 this.response = null; 134 this.isError = false; 135 this.size = size; 136 if (connection != null) { 137 this.user = connection.user; 138 this.retryImmediatelySupported = connection.retryImmediatelySupported; 139 this.clientCertificateChain = connection.clientCertificateChain; 140 } else { 141 this.user = null; 142 this.retryImmediatelySupported = false; 143 this.clientCertificateChain = null; 144 } 145 this.remoteAddress = remoteAddress; 146 this.timeout = timeout; 147 this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE; 148 this.bbAllocator = byteBuffAllocator; 149 this.cellBlockBuilder = cellBlockBuilder; 150 this.reqCleanup = reqCleanup; 151 this.span = Span.current(); 152 } 153 154 /** 155 * Call is done. Execution happened and we returned results to client. It is now safe to cleanup. 156 */ 157 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 158 justification = "Presume the lock on processing request held by caller is protection enough") 159 @Override 160 public void done() { 161 if (this.cellBlockStream != null) { 162 // This will return back the BBs which we got from pool. 163 this.cellBlockStream.releaseResources(); 164 this.cellBlockStream = null; 165 } 166 // If the call was run successfuly, we might have already returned the BB 167 // back to pool. No worries..Then inputCellBlock will be null 168 cleanup(); 169 span.end(); 170 } 171 172 @Override 173 public void cleanup() { 174 for (;;) { 175 int ref = reference.get(); 176 if ((ref & 0x80000000) == 0) { 177 return; 178 } 179 int nextRef = ref & 0x7fffffff; 180 if (reference.compareAndSet(ref, nextRef)) { 181 if (nextRef == 0) { 182 if (this.reqCleanup != null) { 183 this.reqCleanup.run(); 184 } 185 } 186 return; 187 } 188 } 189 } 190 191 public void retainByWAL() { 192 reference.incrementAndGet(); 193 } 194 195 public void releaseByWAL() { 196 // Here this method of decrementAndGet for releasing WAL reference count will work in both 197 // cases - i.e. highest bit (cleanup) 1 or 0. We will be decrementing a negative or positive 198 // value respectively in these 2 cases, but the logic will work the same way 199 if (reference.decrementAndGet() == 0) { 200 if (this.reqCleanup != null) { 201 this.reqCleanup.run(); 202 } 203 } 204 205 } 206 207 @Override 208 public String toString() { 209 return toShortString() + " param: " 210 + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + " connection: " 211 + connection.toString(); 212 } 213 214 @Override 215 public RequestHeader getHeader() { 216 return this.header; 217 } 218 219 @Override 220 public Map<String, byte[]> getConnectionAttributes() { 221 return this.connection.connectionAttributes; 222 } 223 224 @Override 225 public Map<String, byte[]> getRequestAttributes() { 226 if (this.requestAttributes == null) { 227 if (header.getAttributeList().isEmpty()) { 228 this.requestAttributes = Collections.emptyMap(); 229 } else { 230 Map<String, byte[]> requestAttributes = 231 Maps.newHashMapWithExpectedSize(header.getAttributeList().size()); 232 for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) { 233 requestAttributes.put(nameBytesPair.getName(), nameBytesPair.getValue().toByteArray()); 234 } 235 this.requestAttributes = requestAttributes; 236 } 237 } 238 return this.requestAttributes; 239 } 240 241 @Override 242 public byte[] getRequestAttribute(String key) { 243 if (this.requestAttributes == null) { 244 for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) { 245 if (nameBytesPair.getName().equals(key)) { 246 return nameBytesPair.getValue().toByteArray(); 247 } 248 } 249 return null; 250 } 251 return this.requestAttributes.get(key); 252 } 253 254 @Override 255 public int getPriority() { 256 return this.header.getPriority(); 257 } 258 259 /* 260 * Short string representation without param info because param itself could be huge depends on 261 * the payload of a command 262 */ 263 @Override 264 public String toShortString() { 265 String serviceName = this.connection.service != null 266 ? this.connection.service.getDescriptorForType().getName() 267 : "null"; 268 return "callId: " + this.id + " service: " + serviceName + " methodName: " 269 + ((this.md != null) ? this.md.getName() : "n/a") + " size: " 270 + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + " connection: " 271 + connection + " deadline: " + deadline; 272 } 273 274 @Override 275 public synchronized void setResponse(Message m, final ExtendedCellScanner cells, Throwable t, 276 String errorMsg) { 277 if (this.isError) { 278 return; 279 } 280 if (t != null) { 281 this.isError = true; 282 TraceUtil.setError(span, t); 283 } else { 284 span.setStatus(StatusCode.OK); 285 } 286 BufferChain bc = null; 287 try { 288 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); 289 // Call id. 290 headerBuilder.setCallId(this.id); 291 if (t != null) { 292 setExceptionResponse(t, errorMsg, headerBuilder); 293 } 294 // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the 295 // reservoir when finished. This is hacky and the hack is not contained but benefits are 296 // high when we can avoid a big buffer allocation on each rpc. 297 List<ByteBuffer> cellBlock = null; 298 int cellBlockSize = 0; 299 if (bbAllocator.isReservoirEnabled()) { 300 this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec, 301 this.connection.compressionCodec, cells, bbAllocator); 302 if (this.cellBlockStream != null) { 303 cellBlock = this.cellBlockStream.getByteBuffers(); 304 cellBlockSize = this.cellBlockStream.size(); 305 } 306 } else { 307 ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec, 308 this.connection.compressionCodec, cells); 309 if (b != null) { 310 cellBlockSize = b.remaining(); 311 cellBlock = new ArrayList<>(1); 312 cellBlock.add(b); 313 } 314 } 315 316 if (cellBlockSize > 0) { 317 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); 318 // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. 319 cellBlockBuilder.setLength(cellBlockSize); 320 headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); 321 } 322 Message header = headerBuilder.build(); 323 ByteBuffer headerBuf = createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock); 324 ByteBuffer[] responseBufs = null; 325 int cellBlockBufferSize = 0; 326 if (cellBlock != null) { 327 cellBlockBufferSize = cellBlock.size(); 328 responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; 329 } else { 330 responseBufs = new ByteBuffer[1]; 331 } 332 responseBufs[0] = headerBuf; 333 if (cellBlock != null) { 334 for (int i = 0; i < cellBlockBufferSize; i++) { 335 responseBufs[i + 1] = cellBlock.get(i); 336 } 337 } 338 bc = new BufferChain(responseBufs); 339 } catch (IOException e) { 340 RpcServer.LOG.warn("Exception while creating response " + e); 341 bc = createFallbackErrorResponse(e); 342 } 343 this.response = bc; 344 // Once a response message is created and set to this.response, this Call can be treated as 345 // done. The Responder thread will do the n/w write of this message back to client. 346 if (this.rpcCallback != null) { 347 try (Scope ignored = span.makeCurrent()) { 348 this.rpcCallback.run(); 349 } catch (Exception e) { 350 // Don't allow any exception here to kill this handler thread. 351 RpcServer.LOG.warn("Exception while running the Rpc Callback.", e); 352 TraceUtil.setError(span, e); 353 } 354 } 355 } 356 357 static void setExceptionResponse(Throwable t, String errorMsg, 358 ResponseHeader.Builder headerBuilder) { 359 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); 360 exceptionBuilder.setExceptionClassName(t.getClass().getName()); 361 exceptionBuilder.setStackTrace(errorMsg); 362 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException); 363 if (t instanceof RegionMovedException) { 364 // Special casing for this exception. This is only one carrying a payload. 365 // Do this instead of build a generic system for allowing exceptions carry 366 // any kind of payload. 367 RegionMovedException rme = (RegionMovedException) t; 368 exceptionBuilder.setHostname(rme.getHostname()); 369 exceptionBuilder.setPort(rme.getPort()); 370 } else if (t instanceof HBaseServerException) { 371 HBaseServerException hse = (HBaseServerException) t; 372 exceptionBuilder.setServerOverloaded(hse.isServerOverloaded()); 373 } 374 // Set the exception as the result of the method invocation. 375 headerBuilder.setException(exceptionBuilder.build()); 376 } 377 378 /* 379 * Creates a fallback error response when the primary response creation fails. This method is 380 * invoked as a last resort when an IOException occurs during the normal response creation 381 * process. It attempts to create a minimal error response containing only the error information, 382 * without any cell blocks or additional data. The purpose is to ensure that the client receives 383 * some indication of the failure rather than experiencing a silent connection drop. This provides 384 * better error handling on the client side. 385 */ 386 private BufferChain createFallbackErrorResponse(IOException originalException) { 387 try { 388 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); 389 headerBuilder.setCallId(this.id); 390 String responseErrorMsg = 391 "Failed to create response due to: " + originalException.getMessage(); 392 setExceptionResponse(originalException, responseErrorMsg, headerBuilder); 393 Message header = headerBuilder.build(); 394 ByteBuffer headerBuf = createHeaderAndMessageBytes(null, header, 0, null); 395 this.isError = true; 396 return new BufferChain(new ByteBuffer[] { headerBuf }); 397 } catch (IOException e) { 398 RpcServer.LOG.error("Failed to create error response for client, connection may be dropped", 399 e); 400 return null; 401 } 402 } 403 404 static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize, 405 List<ByteBuffer> cellBlock) throws IOException { 406 // Organize the response as a set of bytebuffers rather than collect it all together inside 407 // one big byte array; save on allocations. 408 // for writing the header, we check if there is available space in the buffers 409 // created for the cellblock itself. If there is space for the header, we reuse 410 // the last buffer in the cellblock. This applies to the cellblock created from the 411 // pool or even the onheap cellblock buffer in case there is no pool enabled. 412 // Possible reuse would avoid creating a temporary array for storing the header every time. 413 ByteBuffer possiblePBBuf = (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; 414 int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, resultVintSize = 0; 415 if (header != null) { 416 headerSerializedSize = header.getSerializedSize(); 417 headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize); 418 } 419 if (result != null) { 420 resultSerializedSize = result.getSerializedSize(); 421 resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize); 422 } 423 // calculate the total size 424 int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize) 425 + cellBlockSize; 426 int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + resultVintSize 427 + Bytes.SIZEOF_INT; 428 // Only if the last buffer has enough space for header use it. Else allocate 429 // a new buffer. Assume they are all flipped 430 if (possiblePBBuf != null && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { 431 // duplicate the buffer. This is where the header is going to be written 432 ByteBuffer pbBuf = possiblePBBuf.duplicate(); 433 // get the current limit 434 int limit = pbBuf.limit(); 435 // Position such that we write the header to the end of the buffer 436 pbBuf.position(limit); 437 // limit to the header size 438 pbBuf.limit(totalPBSize + limit); 439 // mark the current position 440 pbBuf.mark(); 441 writeToCOS(result, header, totalSize, pbBuf); 442 // reset the buffer back to old position 443 pbBuf.reset(); 444 return pbBuf; 445 } else { 446 return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); 447 } 448 } 449 450 private static void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) 451 throws IOException { 452 ByteBufferUtils.putInt(pbBuf, totalSize); 453 // create COS that works on BB 454 CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); 455 if (header != null) { 456 cos.writeMessageNoTag(header); 457 } 458 if (result != null) { 459 cos.writeMessageNoTag(result); 460 } 461 cos.flush(); 462 cos.checkNoSpaceLeft(); 463 } 464 465 private static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, 466 int totalSize, int totalPBSize) throws IOException { 467 ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize); 468 writeToCOS(result, header, totalSize, pbBuf); 469 pbBuf.flip(); 470 return pbBuf; 471 } 472 473 @Override 474 public long disconnectSince() { 475 if (!this.connection.isConnectionOpen()) { 476 return EnvironmentEdgeManager.currentTime() - receiveTime; 477 } else { 478 return -1L; 479 } 480 } 481 482 @Override 483 public boolean isClientCellBlockSupported() { 484 return this.connection != null && this.connection.codec != null; 485 } 486 487 @Override 488 public long getResponseCellSize() { 489 return responseCellSize; 490 } 491 492 @Override 493 public void incrementResponseCellSize(long cellSize) { 494 responseCellSize += cellSize; 495 } 496 497 @Override 498 public long getBlockBytesScanned() { 499 return responseBlockSize; 500 } 501 502 @Override 503 public void incrementBlockBytesScanned(long blockSize) { 504 responseBlockSize += blockSize; 505 } 506 507 @Override 508 public long getResponseExceptionSize() { 509 return exceptionSize; 510 } 511 512 @Override 513 public void incrementResponseExceptionSize(long exSize) { 514 exceptionSize += exSize; 515 } 516 517 @Override 518 public long getSize() { 519 return this.size; 520 } 521 522 @Override 523 public long getDeadline() { 524 return deadline; 525 } 526 527 @Override 528 public Optional<User> getRequestUser() { 529 return Optional.ofNullable(user); 530 } 531 532 @Override 533 public Optional<X509Certificate[]> getClientCertificateChain() { 534 return Optional.ofNullable(clientCertificateChain); 535 } 536 537 @Override 538 public InetAddress getRemoteAddress() { 539 return remoteAddress; 540 } 541 542 @Override 543 public VersionInfo getClientVersionInfo() { 544 return connection.getVersionInfo(); 545 } 546 547 @Override 548 public synchronized void setCallBack(RpcCallback callback) { 549 this.rpcCallback = callback; 550 } 551 552 @Override 553 public boolean isRetryImmediatelySupported() { 554 return retryImmediatelySupported; 555 } 556 557 @Override 558 public BlockingService getService() { 559 return service; 560 } 561 562 @Override 563 public MethodDescriptor getMethod() { 564 return md; 565 } 566 567 @Override 568 public Message getParam() { 569 return param; 570 } 571 572 @Override 573 public ExtendedCellScanner getCellScanner() { 574 return cellScanner; 575 } 576 577 @Override 578 public long getReceiveTime() { 579 return receiveTime; 580 } 581 582 @Override 583 public long getStartTime() { 584 return startTime; 585 } 586 587 @Override 588 public void setStartTime(long t) { 589 this.startTime = t; 590 } 591 592 @Override 593 public int getTimeout() { 594 return timeout; 595 } 596 597 @Override 598 public int getRemotePort() { 599 return connection.getRemotePort(); 600 } 601 602 @Override 603 public synchronized BufferChain getResponse() { 604 return response; 605 } 606}