001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.api.trace.StatusCode;
022import io.opentelemetry.context.Scope;
023import java.io.IOException;
024import java.net.InetAddress;
025import java.nio.ByteBuffer;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.concurrent.atomic.AtomicInteger;
032import org.apache.hadoop.hbase.CellScanner;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseServerException;
035import org.apache.hadoop.hbase.exceptions.RegionMovedException;
036import org.apache.hadoop.hbase.io.ByteBuffAllocator;
037import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
038import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
039import org.apache.hadoop.hbase.security.User;
040import org.apache.hadoop.hbase.trace.TraceUtil;
041import org.apache.hadoop.hbase.util.ByteBufferUtils;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.util.StringUtils;
045import org.apache.yetus.audience.InterfaceAudience;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
048import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
049import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
051import org.apache.hbase.thirdparty.com.google.protobuf.Message;
052
053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
060
061/**
062 * Datastructure that holds all necessary to a method invocation and then afterward, carries the
063 * result.
064 */
065@InterfaceAudience.Private
066public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
067
068  protected final int id; // the client's call id
069  protected final BlockingService service;
070  protected final MethodDescriptor md;
071  protected final RequestHeader header;
072  protected Message param; // the parameter passed
073  // Optional cell data passed outside of protobufs.
074  protected final CellScanner cellScanner;
075  protected final T connection; // connection to client
076  protected final long receiveTime; // the time received when response is null
077  // the time served when response is not null
078  protected final int timeout;
079  protected long startTime;
080  protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
081
082  protected final ByteBuffAllocator bbAllocator;
083
084  protected final CellBlockBuilder cellBlockBuilder;
085
086  /**
087   * Chain of buffers to send as response.
088   */
089  protected BufferChain response;
090
091  protected final long size; // size of current call
092  protected boolean isError;
093  protected ByteBufferListOutputStream cellBlockStream = null;
094  protected CallCleanup reqCleanup = null;
095
096  protected final User user;
097  protected final InetAddress remoteAddress;
098  protected RpcCallback rpcCallback;
099
100  private long responseCellSize = 0;
101  private long responseBlockSize = 0;
102  // cumulative size of serialized exceptions
103  private long exceptionSize = 0;
104  private final boolean retryImmediatelySupported;
105  private volatile Map<String, byte[]> requestAttributes;
106
107  // This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and
108  // the rest of the bits are for WAL reference count. We can only call release if all of them are
109  // zero. The reason why we can not use a general reference counting is that, we may call cleanup
110  // multiple times in the current implementation. We should fix this in the future.
111  // The refCount here will start as 0x80000000 and increment with every WAL reference and decrement
112  // from WAL side on release
113  private final AtomicInteger reference = new AtomicInteger(0x80000000);
114
115  private final Span span;
116
117  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
118      justification = "Can't figure why this complaint is happening... see below")
119  ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
120    Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress,
121    long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator,
122    CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
123    this.id = id;
124    this.service = service;
125    this.md = md;
126    this.header = header;
127    this.param = param;
128    this.cellScanner = cellScanner;
129    this.connection = connection;
130    this.receiveTime = receiveTime;
131    this.response = null;
132    this.isError = false;
133    this.size = size;
134    if (connection != null) {
135      this.user = connection.user;
136      this.retryImmediatelySupported = connection.retryImmediatelySupported;
137    } else {
138      this.user = null;
139      this.retryImmediatelySupported = false;
140    }
141    this.remoteAddress = remoteAddress;
142    this.timeout = timeout;
143    this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
144    this.bbAllocator = byteBuffAllocator;
145    this.cellBlockBuilder = cellBlockBuilder;
146    this.reqCleanup = reqCleanup;
147    this.span = Span.current();
148  }
149
150  /**
151   * Call is done. Execution happened and we returned results to client. It is now safe to cleanup.
152   */
153  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
154      justification = "Presume the lock on processing request held by caller is protection enough")
155  @Override
156  public void done() {
157    if (this.cellBlockStream != null) {
158      // This will return back the BBs which we got from pool.
159      this.cellBlockStream.releaseResources();
160      this.cellBlockStream = null;
161    }
162    // If the call was run successfuly, we might have already returned the BB
163    // back to pool. No worries..Then inputCellBlock will be null
164    cleanup();
165    span.end();
166  }
167
168  @Override
169  public void cleanup() {
170    for (;;) {
171      int ref = reference.get();
172      if ((ref & 0x80000000) == 0) {
173        return;
174      }
175      int nextRef = ref & 0x7fffffff;
176      if (reference.compareAndSet(ref, nextRef)) {
177        if (nextRef == 0) {
178          if (this.reqCleanup != null) {
179            this.reqCleanup.run();
180          }
181        }
182        return;
183      }
184    }
185  }
186
187  public void retainByWAL() {
188    reference.incrementAndGet();
189  }
190
191  public void releaseByWAL() {
192    // Here this method of decrementAndGet for releasing WAL reference count will work in both
193    // cases - i.e. highest bit (cleanup) 1 or 0. We will be decrementing a negative or positive
194    // value respectively in these 2 cases, but the logic will work the same way
195    if (reference.decrementAndGet() == 0) {
196      if (this.reqCleanup != null) {
197        this.reqCleanup.run();
198      }
199    }
200
201  }
202
203  @Override
204  public String toString() {
205    return toShortString() + " param: "
206      + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + " connection: "
207      + connection.toString();
208  }
209
210  @Override
211  public RequestHeader getHeader() {
212    return this.header;
213  }
214
215  @Override
216  public Map<String, byte[]> getConnectionAttributes() {
217    return this.connection.connectionAttributes;
218  }
219
220  @Override
221  public Map<String, byte[]> getRequestAttributes() {
222    if (this.requestAttributes == null) {
223      if (header.getAttributeList().isEmpty()) {
224        this.requestAttributes = Collections.emptyMap();
225      } else {
226        Map<String, byte[]> requestAttributes =
227          Maps.newHashMapWithExpectedSize(header.getAttributeList().size());
228        for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) {
229          requestAttributes.put(nameBytesPair.getName(), nameBytesPair.getValue().toByteArray());
230        }
231        this.requestAttributes = requestAttributes;
232      }
233    }
234    return this.requestAttributes;
235  }
236
237  @Override
238  public byte[] getRequestAttribute(String key) {
239    if (this.requestAttributes == null) {
240      for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) {
241        if (nameBytesPair.getName().equals(key)) {
242          return nameBytesPair.getValue().toByteArray();
243        }
244      }
245      return null;
246    }
247    return this.requestAttributes.get(key);
248  }
249
250  @Override
251  public int getPriority() {
252    return this.header.getPriority();
253  }
254
255  /*
256   * Short string representation without param info because param itself could be huge depends on
257   * the payload of a command
258   */
259  @Override
260  public String toShortString() {
261    String serviceName = this.connection.service != null
262      ? this.connection.service.getDescriptorForType().getName()
263      : "null";
264    return "callId: " + this.id + " service: " + serviceName + " methodName: "
265      + ((this.md != null) ? this.md.getName() : "n/a") + " size: "
266      + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + " connection: "
267      + connection + " deadline: " + deadline;
268  }
269
270  @Override
271  public synchronized void setResponse(Message m, final CellScanner cells, Throwable t,
272    String errorMsg) {
273    if (this.isError) {
274      return;
275    }
276    if (t != null) {
277      this.isError = true;
278      TraceUtil.setError(span, t);
279    } else {
280      span.setStatus(StatusCode.OK);
281    }
282    BufferChain bc = null;
283    try {
284      ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
285      // Call id.
286      headerBuilder.setCallId(this.id);
287      if (t != null) {
288        setExceptionResponse(t, errorMsg, headerBuilder);
289      }
290      // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
291      // reservoir when finished. This is hacky and the hack is not contained but benefits are
292      // high when we can avoid a big buffer allocation on each rpc.
293      List<ByteBuffer> cellBlock = null;
294      int cellBlockSize = 0;
295      if (bbAllocator.isReservoirEnabled()) {
296        this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
297          this.connection.compressionCodec, cells, bbAllocator);
298        if (this.cellBlockStream != null) {
299          cellBlock = this.cellBlockStream.getByteBuffers();
300          cellBlockSize = this.cellBlockStream.size();
301        }
302      } else {
303        ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec,
304          this.connection.compressionCodec, cells);
305        if (b != null) {
306          cellBlockSize = b.remaining();
307          cellBlock = new ArrayList<>(1);
308          cellBlock.add(b);
309        }
310      }
311
312      if (cellBlockSize > 0) {
313        CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
314        // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
315        cellBlockBuilder.setLength(cellBlockSize);
316        headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
317      }
318      Message header = headerBuilder.build();
319      ByteBuffer headerBuf = createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
320      ByteBuffer[] responseBufs = null;
321      int cellBlockBufferSize = 0;
322      if (cellBlock != null) {
323        cellBlockBufferSize = cellBlock.size();
324        responseBufs = new ByteBuffer[1 + cellBlockBufferSize];
325      } else {
326        responseBufs = new ByteBuffer[1];
327      }
328      responseBufs[0] = headerBuf;
329      if (cellBlock != null) {
330        for (int i = 0; i < cellBlockBufferSize; i++) {
331          responseBufs[i + 1] = cellBlock.get(i);
332        }
333      }
334      bc = new BufferChain(responseBufs);
335    } catch (IOException e) {
336      RpcServer.LOG.warn("Exception while creating response " + e);
337    }
338    this.response = bc;
339    // Once a response message is created and set to this.response, this Call can be treated as
340    // done. The Responder thread will do the n/w write of this message back to client.
341    if (this.rpcCallback != null) {
342      try (Scope ignored = span.makeCurrent()) {
343        this.rpcCallback.run();
344      } catch (Exception e) {
345        // Don't allow any exception here to kill this handler thread.
346        RpcServer.LOG.warn("Exception while running the Rpc Callback.", e);
347        TraceUtil.setError(span, e);
348      }
349    }
350  }
351
352  static void setExceptionResponse(Throwable t, String errorMsg,
353    ResponseHeader.Builder headerBuilder) {
354    ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
355    exceptionBuilder.setExceptionClassName(t.getClass().getName());
356    exceptionBuilder.setStackTrace(errorMsg);
357    exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
358    if (t instanceof RegionMovedException) {
359      // Special casing for this exception. This is only one carrying a payload.
360      // Do this instead of build a generic system for allowing exceptions carry
361      // any kind of payload.
362      RegionMovedException rme = (RegionMovedException) t;
363      exceptionBuilder.setHostname(rme.getHostname());
364      exceptionBuilder.setPort(rme.getPort());
365    } else if (t instanceof HBaseServerException) {
366      HBaseServerException hse = (HBaseServerException) t;
367      exceptionBuilder.setServerOverloaded(hse.isServerOverloaded());
368    }
369    // Set the exception as the result of the method invocation.
370    headerBuilder.setException(exceptionBuilder.build());
371  }
372
373  static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize,
374    List<ByteBuffer> cellBlock) throws IOException {
375    // Organize the response as a set of bytebuffers rather than collect it all together inside
376    // one big byte array; save on allocations.
377    // for writing the header, we check if there is available space in the buffers
378    // created for the cellblock itself. If there is space for the header, we reuse
379    // the last buffer in the cellblock. This applies to the cellblock created from the
380    // pool or even the onheap cellblock buffer in case there is no pool enabled.
381    // Possible reuse would avoid creating a temporary array for storing the header every time.
382    ByteBuffer possiblePBBuf = (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
383    int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, resultVintSize = 0;
384    if (header != null) {
385      headerSerializedSize = header.getSerializedSize();
386      headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize);
387    }
388    if (result != null) {
389      resultSerializedSize = result.getSerializedSize();
390      resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize);
391    }
392    // calculate the total size
393    int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize)
394      + cellBlockSize;
395    int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + resultVintSize
396      + Bytes.SIZEOF_INT;
397    // Only if the last buffer has enough space for header use it. Else allocate
398    // a new buffer. Assume they are all flipped
399    if (possiblePBBuf != null && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
400      // duplicate the buffer. This is where the header is going to be written
401      ByteBuffer pbBuf = possiblePBBuf.duplicate();
402      // get the current limit
403      int limit = pbBuf.limit();
404      // Position such that we write the header to the end of the buffer
405      pbBuf.position(limit);
406      // limit to the header size
407      pbBuf.limit(totalPBSize + limit);
408      // mark the current position
409      pbBuf.mark();
410      writeToCOS(result, header, totalSize, pbBuf);
411      // reset the buffer back to old position
412      pbBuf.reset();
413      return pbBuf;
414    } else {
415      return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
416    }
417  }
418
419  private static void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
420    throws IOException {
421    ByteBufferUtils.putInt(pbBuf, totalSize);
422    // create COS that works on BB
423    CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
424    if (header != null) {
425      cos.writeMessageNoTag(header);
426    }
427    if (result != null) {
428      cos.writeMessageNoTag(result);
429    }
430    cos.flush();
431    cos.checkNoSpaceLeft();
432  }
433
434  private static ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
435    int totalSize, int totalPBSize) throws IOException {
436    ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
437    writeToCOS(result, header, totalSize, pbBuf);
438    pbBuf.flip();
439    return pbBuf;
440  }
441
442  @Override
443  public long disconnectSince() {
444    if (!this.connection.isConnectionOpen()) {
445      return EnvironmentEdgeManager.currentTime() - receiveTime;
446    } else {
447      return -1L;
448    }
449  }
450
451  @Override
452  public boolean isClientCellBlockSupported() {
453    return this.connection != null && this.connection.codec != null;
454  }
455
456  @Override
457  public long getResponseCellSize() {
458    return responseCellSize;
459  }
460
461  @Override
462  public void incrementResponseCellSize(long cellSize) {
463    responseCellSize += cellSize;
464  }
465
466  @Override
467  public long getBlockBytesScanned() {
468    return responseBlockSize;
469  }
470
471  @Override
472  public void incrementBlockBytesScanned(long blockSize) {
473    responseBlockSize += blockSize;
474  }
475
476  @Override
477  public long getResponseExceptionSize() {
478    return exceptionSize;
479  }
480
481  @Override
482  public void incrementResponseExceptionSize(long exSize) {
483    exceptionSize += exSize;
484  }
485
486  @Override
487  public long getSize() {
488    return this.size;
489  }
490
491  @Override
492  public long getDeadline() {
493    return deadline;
494  }
495
496  @Override
497  public Optional<User> getRequestUser() {
498    return Optional.ofNullable(user);
499  }
500
501  @Override
502  public InetAddress getRemoteAddress() {
503    return remoteAddress;
504  }
505
506  @Override
507  public VersionInfo getClientVersionInfo() {
508    return connection.getVersionInfo();
509  }
510
511  @Override
512  public synchronized void setCallBack(RpcCallback callback) {
513    this.rpcCallback = callback;
514  }
515
516  @Override
517  public boolean isRetryImmediatelySupported() {
518    return retryImmediatelySupported;
519  }
520
521  @Override
522  public BlockingService getService() {
523    return service;
524  }
525
526  @Override
527  public MethodDescriptor getMethod() {
528    return md;
529  }
530
531  @Override
532  public Message getParam() {
533    return param;
534  }
535
536  @Override
537  public CellScanner getCellScanner() {
538    return cellScanner;
539  }
540
541  @Override
542  public long getReceiveTime() {
543    return receiveTime;
544  }
545
546  @Override
547  public long getStartTime() {
548    return startTime;
549  }
550
551  @Override
552  public void setStartTime(long t) {
553    this.startTime = t;
554  }
555
556  @Override
557  public int getTimeout() {
558    return timeout;
559  }
560
561  @Override
562  public int getRemotePort() {
563    return connection.getRemotePort();
564  }
565
566  @Override
567  public synchronized BufferChain getResponse() {
568    return response;
569  }
570}