001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.api.trace.StatusCode;
022import io.opentelemetry.context.Scope;
023import java.io.IOException;
024import java.net.InetAddress;
025import java.nio.ByteBuffer;
026import java.security.cert.X509Certificate;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import java.util.Map;
031import java.util.Optional;
032import java.util.concurrent.atomic.AtomicInteger;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.ExtendedCellScanner;
035import org.apache.hadoop.hbase.HBaseServerException;
036import org.apache.hadoop.hbase.exceptions.RegionMovedException;
037import org.apache.hadoop.hbase.io.ByteBuffAllocator;
038import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
039import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.trace.TraceUtil;
042import org.apache.hadoop.hbase.util.ByteBufferUtils;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.hadoop.util.StringUtils;
046import org.apache.yetus.audience.InterfaceAudience;
047
048import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
050import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
051import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
052import org.apache.hbase.thirdparty.com.google.protobuf.Message;
053
054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
061
062/**
063 * Datastructure that holds all necessary to a method invocation and then afterward, carries the
064 * result.
065 */
066@InterfaceAudience.Private
067public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
068
069  protected final int id; // the client's call id
070  protected final BlockingService service;
071  protected final MethodDescriptor md;
072  protected final RequestHeader header;
073  protected Message param; // the parameter passed
074  // Optional cell data passed outside of protobufs.
075  protected final ExtendedCellScanner cellScanner;
076  protected final T connection; // connection to client
077  protected final long receiveTime; // the time received when response is null
078  // the time served when response is not null
079  protected final int timeout;
080  protected long startTime;
081  protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
082
083  protected final ByteBuffAllocator bbAllocator;
084
085  protected final CellBlockBuilder cellBlockBuilder;
086
087  /**
088   * Chain of buffers to send as response.
089   */
090  protected BufferChain response;
091
092  protected final long size; // size of current call
093  protected boolean isError;
094  protected ByteBufferListOutputStream cellBlockStream = null;
095  protected CallCleanup reqCleanup = null;
096
097  protected final User user;
098  protected final InetAddress remoteAddress;
099  protected final X509Certificate[] clientCertificateChain;
100  protected RpcCallback rpcCallback;
101
102  private long responseCellSize = 0;
103  private long responseBlockSize = 0;
104  private long fsReadTimeMillis = 0;
105  // cumulative size of serialized exceptions
106  private long exceptionSize = 0;
107  private final boolean retryImmediatelySupported;
108  private volatile Map<String, byte[]> requestAttributes;
109
110  // This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and
111  // the rest of the bits are for WAL reference count. We can only call release if all of them are
112  // zero. The reason why we can not use a general reference counting is that, we may call cleanup
113  // multiple times in the current implementation. We should fix this in the future.
114  // The refCount here will start as 0x80000000 and increment with every WAL reference and decrement
115  // from WAL side on release
116  private final AtomicInteger reference = new AtomicInteger(0x80000000);
117
118  private final Span span;
119
120  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
121      justification = "Can't figure why this complaint is happening... see below")
122  ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
123    Message param, ExtendedCellScanner cellScanner, T connection, long size,
124    InetAddress remoteAddress, long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator,
125    CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
126    this.id = id;
127    this.service = service;
128    this.md = md;
129    this.header = header;
130    this.param = param;
131    this.cellScanner = cellScanner;
132    this.connection = connection;
133    this.receiveTime = receiveTime;
134    this.response = null;
135    this.isError = false;
136    this.size = size;
137    if (connection != null) {
138      this.user = connection.user;
139      this.retryImmediatelySupported = connection.retryImmediatelySupported;
140      this.clientCertificateChain = connection.clientCertificateChain;
141    } else {
142      this.user = null;
143      this.retryImmediatelySupported = false;
144      this.clientCertificateChain = null;
145    }
146    this.remoteAddress = remoteAddress;
147    this.timeout = timeout;
148    this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
149    this.bbAllocator = byteBuffAllocator;
150    this.cellBlockBuilder = cellBlockBuilder;
151    this.reqCleanup = reqCleanup;
152    this.span = Span.current();
153  }
154
155  /**
156   * Call is done. Execution happened and we returned results to client. It is now safe to cleanup.
157   */
158  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
159      justification = "Presume the lock on processing request held by caller is protection enough")
160  @Override
161  public void done() {
162    if (this.cellBlockStream != null) {
163      // This will return back the BBs which we got from pool.
164      this.cellBlockStream.releaseResources();
165      this.cellBlockStream = null;
166    }
167    // If the call was run successfuly, we might have already returned the BB
168    // back to pool. No worries..Then inputCellBlock will be null
169    cleanup();
170    span.end();
171  }
172
173  @Override
174  public void cleanup() {
175    for (;;) {
176      int ref = reference.get();
177      if ((ref & 0x80000000) == 0) {
178        return;
179      }
180      int nextRef = ref & 0x7fffffff;
181      if (reference.compareAndSet(ref, nextRef)) {
182        if (nextRef == 0) {
183          if (this.reqCleanup != null) {
184            this.reqCleanup.run();
185          }
186        }
187        return;
188      }
189    }
190  }
191
192  public void retainByWAL() {
193    reference.incrementAndGet();
194  }
195
196  public void releaseByWAL() {
197    // Here this method of decrementAndGet for releasing WAL reference count will work in both
198    // cases - i.e. highest bit (cleanup) 1 or 0. We will be decrementing a negative or positive
199    // value respectively in these 2 cases, but the logic will work the same way
200    if (reference.decrementAndGet() == 0) {
201      if (this.reqCleanup != null) {
202        this.reqCleanup.run();
203      }
204    }
205
206  }
207
208  @Override
209  public String toString() {
210    return toShortString() + " param: "
211      + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + " connection: "
212      + connection.toString();
213  }
214
215  @Override
216  public RequestHeader getHeader() {
217    return this.header;
218  }
219
220  @Override
221  public Map<String, byte[]> getConnectionAttributes() {
222    return this.connection.connectionAttributes;
223  }
224
225  @Override
226  public Map<String, byte[]> getRequestAttributes() {
227    if (this.requestAttributes == null) {
228      if (header.getAttributeList().isEmpty()) {
229        this.requestAttributes = Collections.emptyMap();
230      } else {
231        Map<String, byte[]> requestAttributes =
232          Maps.newHashMapWithExpectedSize(header.getAttributeList().size());
233        for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) {
234          requestAttributes.put(nameBytesPair.getName(), nameBytesPair.getValue().toByteArray());
235        }
236        this.requestAttributes = requestAttributes;
237      }
238    }
239    return this.requestAttributes;
240  }
241
242  @Override
243  public byte[] getRequestAttribute(String key) {
244    if (this.requestAttributes == null) {
245      for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) {
246        if (nameBytesPair.getName().equals(key)) {
247          return nameBytesPair.getValue().toByteArray();
248        }
249      }
250      return null;
251    }
252    return this.requestAttributes.get(key);
253  }
254
255  @Override
256  public int getPriority() {
257    return this.header.getPriority();
258  }
259
260  /*
261   * Short string representation without param info because param itself could be huge depends on
262   * the payload of a command
263   */
264  @Override
265  public String toShortString() {
266    String serviceName = this.connection.service != null
267      ? this.connection.service.getDescriptorForType().getName()
268      : "null";
269    return "callId: " + this.id + " service: " + serviceName + " methodName: "
270      + ((this.md != null) ? this.md.getName() : "n/a") + " size: "
271      + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + " connection: "
272      + connection + " deadline: " + deadline;
273  }
274
275  @Override
276  public synchronized void setResponse(Message m, final ExtendedCellScanner cells, Throwable t,
277    String errorMsg) {
278    if (this.isError) {
279      return;
280    }
281    if (t != null) {
282      this.isError = true;
283      TraceUtil.setError(span, t);
284    } else {
285      span.setStatus(StatusCode.OK);
286    }
287    BufferChain bc = null;
288    try {
289      ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
290      // Call id.
291      headerBuilder.setCallId(this.id);
292      if (t != null) {
293        setExceptionResponse(t, errorMsg, headerBuilder);
294      }
295      // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
296      // reservoir when finished. This is hacky and the hack is not contained but benefits are
297      // high when we can avoid a big buffer allocation on each rpc.
298      List<ByteBuffer> cellBlock = null;
299      int cellBlockSize = 0;
300      if (bbAllocator.isReservoirEnabled()) {
301        this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
302          this.connection.compressionCodec, cells, bbAllocator);
303        if (this.cellBlockStream != null) {
304          cellBlock = this.cellBlockStream.getByteBuffers();
305          cellBlockSize = this.cellBlockStream.size();
306        }
307      } else {
308        ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec,
309          this.connection.compressionCodec, cells);
310        if (b != null) {
311          cellBlockSize = b.remaining();
312          cellBlock = new ArrayList<>(1);
313          cellBlock.add(b);
314        }
315      }
316
317      if (cellBlockSize > 0) {
318        CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
319        // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
320        cellBlockBuilder.setLength(cellBlockSize);
321        headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
322      }
323      Message header = headerBuilder.build();
324      ByteBuffer headerBuf = createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
325      ByteBuffer[] responseBufs = null;
326      int cellBlockBufferSize = 0;
327      if (cellBlock != null) {
328        cellBlockBufferSize = cellBlock.size();
329        responseBufs = new ByteBuffer[1 + cellBlockBufferSize];
330      } else {
331        responseBufs = new ByteBuffer[1];
332      }
333      responseBufs[0] = headerBuf;
334      if (cellBlock != null) {
335        for (int i = 0; i < cellBlockBufferSize; i++) {
336          responseBufs[i + 1] = cellBlock.get(i);
337        }
338      }
339      bc = new BufferChain(responseBufs);
340    } catch (IOException e) {
341      RpcServer.LOG.warn("Exception while creating response " + e);
342      bc = createFallbackErrorResponse(e);
343    }
344    this.response = bc;
345    // Once a response message is created and set to this.response, this Call can be treated as
346    // done. The Responder thread will do the n/w write of this message back to client.
347    if (this.rpcCallback != null) {
348      try (Scope ignored = span.makeCurrent()) {
349        this.rpcCallback.run();
350      } catch (Exception e) {
351        // Don't allow any exception here to kill this handler thread.
352        RpcServer.LOG.warn("Exception while running the Rpc Callback.", e);
353        TraceUtil.setError(span, e);
354      }
355    }
356  }
357
358  static void setExceptionResponse(Throwable t, String errorMsg,
359    ResponseHeader.Builder headerBuilder) {
360    ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
361    exceptionBuilder.setExceptionClassName(t.getClass().getName());
362    exceptionBuilder.setStackTrace(errorMsg);
363    exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
364    if (t instanceof RegionMovedException) {
365      // Special casing for this exception. This is only one carrying a payload.
366      // Do this instead of build a generic system for allowing exceptions carry
367      // any kind of payload.
368      RegionMovedException rme = (RegionMovedException) t;
369      exceptionBuilder.setHostname(rme.getHostname());
370      exceptionBuilder.setPort(rme.getPort());
371    } else if (t instanceof HBaseServerException) {
372      HBaseServerException hse = (HBaseServerException) t;
373      exceptionBuilder.setServerOverloaded(hse.isServerOverloaded());
374    }
375    // Set the exception as the result of the method invocation.
376    headerBuilder.setException(exceptionBuilder.build());
377  }
378
379  /*
380   * Creates a fallback error response when the primary response creation fails. This method is
381   * invoked as a last resort when an IOException occurs during the normal response creation
382   * process. It attempts to create a minimal error response containing only the error information,
383   * without any cell blocks or additional data. The purpose is to ensure that the client receives
384   * some indication of the failure rather than experiencing a silent connection drop. This provides
385   * better error handling on the client side.
386   */
387  private BufferChain createFallbackErrorResponse(IOException originalException) {
388    try {
389      ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
390      headerBuilder.setCallId(this.id);
391      String responseErrorMsg =
392        "Failed to create response due to: " + originalException.getMessage();
393      setExceptionResponse(originalException, responseErrorMsg, headerBuilder);
394      Message header = headerBuilder.build();
395      ByteBuffer headerBuf = createHeaderAndMessageBytes(null, header, 0, null);
396      this.isError = true;
397      return new BufferChain(new ByteBuffer[] { headerBuf });
398    } catch (IOException e) {
399      RpcServer.LOG.error("Failed to create error response for client, connection may be dropped",
400        e);
401      return null;
402    }
403  }
404
405  static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize,
406    List<ByteBuffer> cellBlock) throws IOException {
407    // Organize the response as a set of bytebuffers rather than collect it all together inside
408    // one big byte array; save on allocations.
409    // for writing the header, we check if there is available space in the buffers
410    // created for the cellblock itself. If there is space for the header, we reuse
411    // the last buffer in the cellblock. This applies to the cellblock created from the
412    // pool or even the onheap cellblock buffer in case there is no pool enabled.
413    // Possible reuse would avoid creating a temporary array for storing the header every time.
414    ByteBuffer possiblePBBuf = (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
415    int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, resultVintSize = 0;
416    if (header != null) {
417      headerSerializedSize = header.getSerializedSize();
418      headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize);
419    }
420    if (result != null) {
421      resultSerializedSize = result.getSerializedSize();
422      resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize);
423    }
424    // calculate the total size
425    int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize)
426      + cellBlockSize;
427    int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + resultVintSize
428      + Bytes.SIZEOF_INT;
429    // Only if the last buffer has enough space for header use it. Else allocate
430    // a new buffer. Assume they are all flipped
431    if (possiblePBBuf != null && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
432      // duplicate the buffer. This is where the header is going to be written
433      ByteBuffer pbBuf = possiblePBBuf.duplicate();
434      // get the current limit
435      int limit = pbBuf.limit();
436      // Position such that we write the header to the end of the buffer
437      pbBuf.position(limit);
438      // limit to the header size
439      pbBuf.limit(totalPBSize + limit);
440      // mark the current position
441      pbBuf.mark();
442      writeToCOS(result, header, totalSize, pbBuf);
443      // reset the buffer back to old position
444      pbBuf.reset();
445      return pbBuf;
446    } else {
447      return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
448    }
449  }
450
451  private static void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
452    throws IOException {
453    ByteBufferUtils.putInt(pbBuf, totalSize);
454    // create COS that works on BB
455    CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
456    if (header != null) {
457      cos.writeMessageNoTag(header);
458    }
459    if (result != null) {
460      cos.writeMessageNoTag(result);
461    }
462    cos.flush();
463    cos.checkNoSpaceLeft();
464  }
465
466  private static ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
467    int totalSize, int totalPBSize) throws IOException {
468    ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
469    writeToCOS(result, header, totalSize, pbBuf);
470    pbBuf.flip();
471    return pbBuf;
472  }
473
474  @Override
475  public long disconnectSince() {
476    if (!this.connection.isConnectionOpen()) {
477      return EnvironmentEdgeManager.currentTime() - receiveTime;
478    } else {
479      return -1L;
480    }
481  }
482
483  @Override
484  public boolean isClientCellBlockSupported() {
485    return this.connection != null && this.connection.codec != null;
486  }
487
488  @Override
489  public long getResponseCellSize() {
490    return responseCellSize;
491  }
492
493  @Override
494  public void incrementResponseCellSize(long cellSize) {
495    responseCellSize += cellSize;
496  }
497
498  @Override
499  public long getBlockBytesScanned() {
500    return responseBlockSize;
501  }
502
503  @Override
504  public void incrementBlockBytesScanned(long blockSize) {
505    responseBlockSize += blockSize;
506  }
507
508  @Override
509  public long getResponseExceptionSize() {
510    return exceptionSize;
511  }
512
513  @Override
514  public void incrementResponseExceptionSize(long exSize) {
515    exceptionSize += exSize;
516  }
517
518  @Override
519  public long getSize() {
520    return this.size;
521  }
522
523  @Override
524  public long getDeadline() {
525    return deadline;
526  }
527
528  @Override
529  public Optional<User> getRequestUser() {
530    return Optional.ofNullable(user);
531  }
532
533  @Override
534  public Optional<X509Certificate[]> getClientCertificateChain() {
535    return Optional.ofNullable(clientCertificateChain);
536  }
537
538  @Override
539  public InetAddress getRemoteAddress() {
540    return remoteAddress;
541  }
542
543  @Override
544  public VersionInfo getClientVersionInfo() {
545    return connection.getVersionInfo();
546  }
547
548  @Override
549  public synchronized void setCallBack(RpcCallback callback) {
550    this.rpcCallback = callback;
551  }
552
553  @Override
554  public boolean isRetryImmediatelySupported() {
555    return retryImmediatelySupported;
556  }
557
558  @Override
559  public BlockingService getService() {
560    return service;
561  }
562
563  @Override
564  public MethodDescriptor getMethod() {
565    return md;
566  }
567
568  @Override
569  public Message getParam() {
570    return param;
571  }
572
573  @Override
574  public ExtendedCellScanner getCellScanner() {
575    return cellScanner;
576  }
577
578  @Override
579  public long getReceiveTime() {
580    return receiveTime;
581  }
582
583  @Override
584  public long getStartTime() {
585    return startTime;
586  }
587
588  @Override
589  public void setStartTime(long t) {
590    this.startTime = t;
591  }
592
593  @Override
594  public int getTimeout() {
595    return timeout;
596  }
597
598  @Override
599  public int getRemotePort() {
600    return connection.getRemotePort();
601  }
602
603  @Override
604  public synchronized BufferChain getResponse() {
605    return response;
606  }
607
608  @Override
609  public void updateFsReadTime(long latencyMillis) {
610    fsReadTimeMillis += latencyMillis;
611  }
612
613  @Override
614  public long getFsReadTime() {
615    return fsReadTimeMillis;
616  }
617}