001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.api.trace.StatusCode; 022import io.opentelemetry.context.Scope; 023import java.io.IOException; 024import java.net.InetAddress; 025import java.nio.ByteBuffer; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.concurrent.atomic.AtomicInteger; 032import org.apache.hadoop.hbase.CellScanner; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseServerException; 035import org.apache.hadoop.hbase.exceptions.RegionMovedException; 036import org.apache.hadoop.hbase.io.ByteBuffAllocator; 037import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; 038import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 039import org.apache.hadoop.hbase.security.User; 040import org.apache.hadoop.hbase.trace.TraceUtil; 041import org.apache.hadoop.hbase.util.ByteBufferUtils; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.util.StringUtils; 045import org.apache.yetus.audience.InterfaceAudience; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 048import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 049import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 051import org.apache.hbase.thirdparty.com.google.protobuf.Message; 052 053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 060 061/** 062 * Datastructure that holds all necessary to a method invocation and then afterward, carries the 063 * result. 064 */ 065@InterfaceAudience.Private 066public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse { 067 068 protected final int id; // the client's call id 069 protected final BlockingService service; 070 protected final MethodDescriptor md; 071 protected final RequestHeader header; 072 protected Message param; // the parameter passed 073 // Optional cell data passed outside of protobufs. 074 protected final CellScanner cellScanner; 075 protected final T connection; // connection to client 076 protected final long receiveTime; // the time received when response is null 077 // the time served when response is not null 078 protected final int timeout; 079 protected long startTime; 080 protected final long deadline;// the deadline to handle this call, if exceed we can drop it. 081 082 protected final ByteBuffAllocator bbAllocator; 083 084 protected final CellBlockBuilder cellBlockBuilder; 085 086 /** 087 * Chain of buffers to send as response. 088 */ 089 protected BufferChain response; 090 091 protected final long size; // size of current call 092 protected boolean isError; 093 protected ByteBufferListOutputStream cellBlockStream = null; 094 protected CallCleanup reqCleanup = null; 095 096 protected final User user; 097 protected final InetAddress remoteAddress; 098 protected RpcCallback rpcCallback; 099 100 private long responseCellSize = 0; 101 private long responseBlockSize = 0; 102 // cumulative size of serialized exceptions 103 private long exceptionSize = 0; 104 private final boolean retryImmediatelySupported; 105 private volatile Map<String, byte[]> requestAttributes; 106 107 // This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and 108 // the rest of the bits are for WAL reference count. We can only call release if all of them are 109 // zero. The reason why we can not use a general reference counting is that, we may call cleanup 110 // multiple times in the current implementation. We should fix this in the future. 111 // The refCount here will start as 0x80000000 and increment with every WAL reference and decrement 112 // from WAL side on release 113 private final AtomicInteger reference = new AtomicInteger(0x80000000); 114 115 private final Span span; 116 117 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", 118 justification = "Can't figure why this complaint is happening... see below") 119 ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, 120 Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress, 121 long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator, 122 CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { 123 this.id = id; 124 this.service = service; 125 this.md = md; 126 this.header = header; 127 this.param = param; 128 this.cellScanner = cellScanner; 129 this.connection = connection; 130 this.receiveTime = receiveTime; 131 this.response = null; 132 this.isError = false; 133 this.size = size; 134 if (connection != null) { 135 this.user = connection.user; 136 this.retryImmediatelySupported = connection.retryImmediatelySupported; 137 } else { 138 this.user = null; 139 this.retryImmediatelySupported = false; 140 } 141 this.remoteAddress = remoteAddress; 142 this.timeout = timeout; 143 this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE; 144 this.bbAllocator = byteBuffAllocator; 145 this.cellBlockBuilder = cellBlockBuilder; 146 this.reqCleanup = reqCleanup; 147 this.span = Span.current(); 148 } 149 150 /** 151 * Call is done. Execution happened and we returned results to client. It is now safe to cleanup. 152 */ 153 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", 154 justification = "Presume the lock on processing request held by caller is protection enough") 155 @Override 156 public void done() { 157 if (this.cellBlockStream != null) { 158 // This will return back the BBs which we got from pool. 159 this.cellBlockStream.releaseResources(); 160 this.cellBlockStream = null; 161 } 162 // If the call was run successfuly, we might have already returned the BB 163 // back to pool. No worries..Then inputCellBlock will be null 164 cleanup(); 165 span.end(); 166 } 167 168 @Override 169 public void cleanup() { 170 for (;;) { 171 int ref = reference.get(); 172 if ((ref & 0x80000000) == 0) { 173 return; 174 } 175 int nextRef = ref & 0x7fffffff; 176 if (reference.compareAndSet(ref, nextRef)) { 177 if (nextRef == 0) { 178 if (this.reqCleanup != null) { 179 this.reqCleanup.run(); 180 } 181 } 182 return; 183 } 184 } 185 } 186 187 public void retainByWAL() { 188 reference.incrementAndGet(); 189 } 190 191 public void releaseByWAL() { 192 // Here this method of decrementAndGet for releasing WAL reference count will work in both 193 // cases - i.e. highest bit (cleanup) 1 or 0. We will be decrementing a negative or positive 194 // value respectively in these 2 cases, but the logic will work the same way 195 if (reference.decrementAndGet() == 0) { 196 if (this.reqCleanup != null) { 197 this.reqCleanup.run(); 198 } 199 } 200 201 } 202 203 @Override 204 public String toString() { 205 return toShortString() + " param: " 206 + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + " connection: " 207 + connection.toString(); 208 } 209 210 @Override 211 public RequestHeader getHeader() { 212 return this.header; 213 } 214 215 @Override 216 public Map<String, byte[]> getConnectionAttributes() { 217 return this.connection.connectionAttributes; 218 } 219 220 @Override 221 public Map<String, byte[]> getRequestAttributes() { 222 if (this.requestAttributes == null) { 223 if (header.getAttributeList().isEmpty()) { 224 this.requestAttributes = Collections.emptyMap(); 225 } else { 226 Map<String, byte[]> requestAttributes = 227 Maps.newHashMapWithExpectedSize(header.getAttributeList().size()); 228 for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) { 229 requestAttributes.put(nameBytesPair.getName(), nameBytesPair.getValue().toByteArray()); 230 } 231 this.requestAttributes = requestAttributes; 232 } 233 } 234 return this.requestAttributes; 235 } 236 237 @Override 238 public byte[] getRequestAttribute(String key) { 239 if (this.requestAttributes == null) { 240 for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) { 241 if (nameBytesPair.getName().equals(key)) { 242 return nameBytesPair.getValue().toByteArray(); 243 } 244 } 245 return null; 246 } 247 return this.requestAttributes.get(key); 248 } 249 250 @Override 251 public int getPriority() { 252 return this.header.getPriority(); 253 } 254 255 /* 256 * Short string representation without param info because param itself could be huge depends on 257 * the payload of a command 258 */ 259 @Override 260 public String toShortString() { 261 String serviceName = this.connection.service != null 262 ? this.connection.service.getDescriptorForType().getName() 263 : "null"; 264 return "callId: " + this.id + " service: " + serviceName + " methodName: " 265 + ((this.md != null) ? this.md.getName() : "n/a") + " size: " 266 + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + " connection: " 267 + connection + " deadline: " + deadline; 268 } 269 270 @Override 271 public synchronized void setResponse(Message m, final CellScanner cells, Throwable t, 272 String errorMsg) { 273 if (this.isError) { 274 return; 275 } 276 if (t != null) { 277 this.isError = true; 278 TraceUtil.setError(span, t); 279 } else { 280 span.setStatus(StatusCode.OK); 281 } 282 BufferChain bc = null; 283 try { 284 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); 285 // Call id. 286 headerBuilder.setCallId(this.id); 287 if (t != null) { 288 setExceptionResponse(t, errorMsg, headerBuilder); 289 } 290 // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the 291 // reservoir when finished. This is hacky and the hack is not contained but benefits are 292 // high when we can avoid a big buffer allocation on each rpc. 293 List<ByteBuffer> cellBlock = null; 294 int cellBlockSize = 0; 295 if (bbAllocator.isReservoirEnabled()) { 296 this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec, 297 this.connection.compressionCodec, cells, bbAllocator); 298 if (this.cellBlockStream != null) { 299 cellBlock = this.cellBlockStream.getByteBuffers(); 300 cellBlockSize = this.cellBlockStream.size(); 301 } 302 } else { 303 ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec, 304 this.connection.compressionCodec, cells); 305 if (b != null) { 306 cellBlockSize = b.remaining(); 307 cellBlock = new ArrayList<>(1); 308 cellBlock.add(b); 309 } 310 } 311 312 if (cellBlockSize > 0) { 313 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); 314 // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. 315 cellBlockBuilder.setLength(cellBlockSize); 316 headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); 317 } 318 Message header = headerBuilder.build(); 319 ByteBuffer headerBuf = createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock); 320 ByteBuffer[] responseBufs = null; 321 int cellBlockBufferSize = 0; 322 if (cellBlock != null) { 323 cellBlockBufferSize = cellBlock.size(); 324 responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; 325 } else { 326 responseBufs = new ByteBuffer[1]; 327 } 328 responseBufs[0] = headerBuf; 329 if (cellBlock != null) { 330 for (int i = 0; i < cellBlockBufferSize; i++) { 331 responseBufs[i + 1] = cellBlock.get(i); 332 } 333 } 334 bc = new BufferChain(responseBufs); 335 } catch (IOException e) { 336 RpcServer.LOG.warn("Exception while creating response " + e); 337 } 338 this.response = bc; 339 // Once a response message is created and set to this.response, this Call can be treated as 340 // done. The Responder thread will do the n/w write of this message back to client. 341 if (this.rpcCallback != null) { 342 try (Scope ignored = span.makeCurrent()) { 343 this.rpcCallback.run(); 344 } catch (Exception e) { 345 // Don't allow any exception here to kill this handler thread. 346 RpcServer.LOG.warn("Exception while running the Rpc Callback.", e); 347 TraceUtil.setError(span, e); 348 } 349 } 350 } 351 352 static void setExceptionResponse(Throwable t, String errorMsg, 353 ResponseHeader.Builder headerBuilder) { 354 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); 355 exceptionBuilder.setExceptionClassName(t.getClass().getName()); 356 exceptionBuilder.setStackTrace(errorMsg); 357 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException); 358 if (t instanceof RegionMovedException) { 359 // Special casing for this exception. This is only one carrying a payload. 360 // Do this instead of build a generic system for allowing exceptions carry 361 // any kind of payload. 362 RegionMovedException rme = (RegionMovedException) t; 363 exceptionBuilder.setHostname(rme.getHostname()); 364 exceptionBuilder.setPort(rme.getPort()); 365 } else if (t instanceof HBaseServerException) { 366 HBaseServerException hse = (HBaseServerException) t; 367 exceptionBuilder.setServerOverloaded(hse.isServerOverloaded()); 368 } 369 // Set the exception as the result of the method invocation. 370 headerBuilder.setException(exceptionBuilder.build()); 371 } 372 373 static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize, 374 List<ByteBuffer> cellBlock) throws IOException { 375 // Organize the response as a set of bytebuffers rather than collect it all together inside 376 // one big byte array; save on allocations. 377 // for writing the header, we check if there is available space in the buffers 378 // created for the cellblock itself. If there is space for the header, we reuse 379 // the last buffer in the cellblock. This applies to the cellblock created from the 380 // pool or even the onheap cellblock buffer in case there is no pool enabled. 381 // Possible reuse would avoid creating a temporary array for storing the header every time. 382 ByteBuffer possiblePBBuf = (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; 383 int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, resultVintSize = 0; 384 if (header != null) { 385 headerSerializedSize = header.getSerializedSize(); 386 headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize); 387 } 388 if (result != null) { 389 resultSerializedSize = result.getSerializedSize(); 390 resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize); 391 } 392 // calculate the total size 393 int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize) 394 + cellBlockSize; 395 int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + resultVintSize 396 + Bytes.SIZEOF_INT; 397 // Only if the last buffer has enough space for header use it. Else allocate 398 // a new buffer. Assume they are all flipped 399 if (possiblePBBuf != null && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { 400 // duplicate the buffer. This is where the header is going to be written 401 ByteBuffer pbBuf = possiblePBBuf.duplicate(); 402 // get the current limit 403 int limit = pbBuf.limit(); 404 // Position such that we write the header to the end of the buffer 405 pbBuf.position(limit); 406 // limit to the header size 407 pbBuf.limit(totalPBSize + limit); 408 // mark the current position 409 pbBuf.mark(); 410 writeToCOS(result, header, totalSize, pbBuf); 411 // reset the buffer back to old position 412 pbBuf.reset(); 413 return pbBuf; 414 } else { 415 return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); 416 } 417 } 418 419 private static void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) 420 throws IOException { 421 ByteBufferUtils.putInt(pbBuf, totalSize); 422 // create COS that works on BB 423 CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); 424 if (header != null) { 425 cos.writeMessageNoTag(header); 426 } 427 if (result != null) { 428 cos.writeMessageNoTag(result); 429 } 430 cos.flush(); 431 cos.checkNoSpaceLeft(); 432 } 433 434 private static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, 435 int totalSize, int totalPBSize) throws IOException { 436 ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize); 437 writeToCOS(result, header, totalSize, pbBuf); 438 pbBuf.flip(); 439 return pbBuf; 440 } 441 442 @Override 443 public long disconnectSince() { 444 if (!this.connection.isConnectionOpen()) { 445 return EnvironmentEdgeManager.currentTime() - receiveTime; 446 } else { 447 return -1L; 448 } 449 } 450 451 @Override 452 public boolean isClientCellBlockSupported() { 453 return this.connection != null && this.connection.codec != null; 454 } 455 456 @Override 457 public long getResponseCellSize() { 458 return responseCellSize; 459 } 460 461 @Override 462 public void incrementResponseCellSize(long cellSize) { 463 responseCellSize += cellSize; 464 } 465 466 @Override 467 public long getBlockBytesScanned() { 468 return responseBlockSize; 469 } 470 471 @Override 472 public void incrementBlockBytesScanned(long blockSize) { 473 responseBlockSize += blockSize; 474 } 475 476 @Override 477 public long getResponseExceptionSize() { 478 return exceptionSize; 479 } 480 481 @Override 482 public void incrementResponseExceptionSize(long exSize) { 483 exceptionSize += exSize; 484 } 485 486 @Override 487 public long getSize() { 488 return this.size; 489 } 490 491 @Override 492 public long getDeadline() { 493 return deadline; 494 } 495 496 @Override 497 public Optional<User> getRequestUser() { 498 return Optional.ofNullable(user); 499 } 500 501 @Override 502 public InetAddress getRemoteAddress() { 503 return remoteAddress; 504 } 505 506 @Override 507 public VersionInfo getClientVersionInfo() { 508 return connection.getVersionInfo(); 509 } 510 511 @Override 512 public synchronized void setCallBack(RpcCallback callback) { 513 this.rpcCallback = callback; 514 } 515 516 @Override 517 public boolean isRetryImmediatelySupported() { 518 return retryImmediatelySupported; 519 } 520 521 @Override 522 public BlockingService getService() { 523 return service; 524 } 525 526 @Override 527 public MethodDescriptor getMethod() { 528 return md; 529 } 530 531 @Override 532 public Message getParam() { 533 return param; 534 } 535 536 @Override 537 public CellScanner getCellScanner() { 538 return cellScanner; 539 } 540 541 @Override 542 public long getReceiveTime() { 543 return receiveTime; 544 } 545 546 @Override 547 public long getStartTime() { 548 return startTime; 549 } 550 551 @Override 552 public void setStartTime(long t) { 553 this.startTime = t; 554 } 555 556 @Override 557 public int getTimeout() { 558 return timeout; 559 } 560 561 @Override 562 public int getRemotePort() { 563 return connection.getRemotePort(); 564 } 565 566 @Override 567 public synchronized BufferChain getResponse() { 568 return response; 569 } 570}