001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import io.opentelemetry.api.trace.Span;
022import io.opentelemetry.api.trace.StatusCode;
023import io.opentelemetry.context.Scope;
024import java.io.IOException;
025import java.net.InetAddress;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Optional;
030import java.util.concurrent.atomic.AtomicInteger;
031import org.apache.hadoop.hbase.CellScanner;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HBaseServerException;
034import org.apache.hadoop.hbase.exceptions.RegionMovedException;
035import org.apache.hadoop.hbase.io.ByteBuffAllocator;
036import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
037import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
038import org.apache.hadoop.hbase.security.User;
039import org.apache.hadoop.hbase.trace.TraceUtil;
040import org.apache.hadoop.hbase.util.ByteBufferUtils;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.util.StringUtils;
044import org.apache.yetus.audience.InterfaceAudience;
045
046import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
047import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
048import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
049import org.apache.hbase.thirdparty.com.google.protobuf.Message;
050
051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
057
058/**
059 * Datastructure that holds all necessary to a method invocation and then afterward, carries the
060 * result.
061 */
062@InterfaceAudience.Private
063public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
064
065  protected final int id; // the client's call id
066  protected final BlockingService service;
067  protected final MethodDescriptor md;
068  protected final RequestHeader header;
069  protected Message param; // the parameter passed
070  // Optional cell data passed outside of protobufs.
071  protected final CellScanner cellScanner;
072  protected final T connection; // connection to client
073  protected final long receiveTime; // the time received when response is null
074  // the time served when response is not null
075  protected final int timeout;
076  protected long startTime;
077  protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
078
079  protected final ByteBuffAllocator bbAllocator;
080
081  protected final CellBlockBuilder cellBlockBuilder;
082
083  /**
084   * Chain of buffers to send as response.
085   */
086  protected BufferChain response;
087
088  protected final long size; // size of current call
089  protected boolean isError;
090  protected ByteBufferListOutputStream cellBlockStream = null;
091  protected CallCleanup reqCleanup = null;
092
093  protected final User user;
094  protected final InetAddress remoteAddress;
095  protected RpcCallback rpcCallback;
096
097  private long responseCellSize = 0;
098  private long responseBlockSize = 0;
099  // cumulative size of serialized exceptions
100  private long exceptionSize = 0;
101  private final boolean retryImmediatelySupported;
102
103  // This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and
104  // the rest of the bits are for WAL reference count. We can only call release if all of them are
105  // zero. The reason why we can not use a general reference counting is that, we may call cleanup
106  // multiple times in the current implementation. We should fix this in the future.
107  // The refCount here will start as 0x80000000 and increment with every WAL reference and decrement
108  // from WAL side on release
109  private final AtomicInteger reference = new AtomicInteger(0x80000000);
110
111  private final Span span;
112
113  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
114      justification = "Can't figure why this complaint is happening... see below")
115  ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
116    Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress,
117    long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator,
118    CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
119    this.id = id;
120    this.service = service;
121    this.md = md;
122    this.header = header;
123    this.param = param;
124    this.cellScanner = cellScanner;
125    this.connection = connection;
126    this.receiveTime = receiveTime;
127    this.response = null;
128    this.isError = false;
129    this.size = size;
130    if (connection != null) {
131      this.user = connection.user;
132      this.retryImmediatelySupported = connection.retryImmediatelySupported;
133    } else {
134      this.user = null;
135      this.retryImmediatelySupported = false;
136    }
137    this.remoteAddress = remoteAddress;
138    this.timeout = timeout;
139    this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
140    this.bbAllocator = byteBuffAllocator;
141    this.cellBlockBuilder = cellBlockBuilder;
142    this.reqCleanup = reqCleanup;
143    this.span = Span.current();
144  }
145
146  /**
147   * Call is done. Execution happened and we returned results to client. It is now safe to cleanup.
148   */
149  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
150      justification = "Presume the lock on processing request held by caller is protection enough")
151  @Override
152  public void done() {
153    if (this.cellBlockStream != null) {
154      // This will return back the BBs which we got from pool.
155      this.cellBlockStream.releaseResources();
156      this.cellBlockStream = null;
157    }
158    // If the call was run successfuly, we might have already returned the BB
159    // back to pool. No worries..Then inputCellBlock will be null
160    cleanup();
161    span.end();
162  }
163
164  @Override
165  public void cleanup() {
166    for (;;) {
167      int ref = reference.get();
168      if ((ref & 0x80000000) == 0) {
169        return;
170      }
171      int nextRef = ref & 0x7fffffff;
172      if (reference.compareAndSet(ref, nextRef)) {
173        if (nextRef == 0) {
174          if (this.reqCleanup != null) {
175            this.reqCleanup.run();
176          }
177        }
178        return;
179      }
180    }
181  }
182
183  public void retainByWAL() {
184    reference.incrementAndGet();
185  }
186
187  public void releaseByWAL() {
188    // Here this method of decrementAndGet for releasing WAL reference count will work in both
189    // cases - i.e. highest bit (cleanup) 1 or 0. We will be decrementing a negative or positive
190    // value respectively in these 2 cases, but the logic will work the same way
191    if (reference.decrementAndGet() == 0) {
192      if (this.reqCleanup != null) {
193        this.reqCleanup.run();
194      }
195    }
196  }
197
198  @Override
199  public String toString() {
200    return toShortString() + " param: "
201      + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + " connection: "
202      + connection.toString();
203  }
204
205  @Override
206  public RequestHeader getHeader() {
207    return this.header;
208  }
209
210  @Override
211  public int getPriority() {
212    return this.header.getPriority();
213  }
214
215  /*
216   * Short string representation without param info because param itself could be huge depends on
217   * the payload of a command
218   */
219  @Override
220  public String toShortString() {
221    String serviceName = this.connection.service != null
222      ? this.connection.service.getDescriptorForType().getName()
223      : "null";
224    return "callId: " + this.id + " service: " + serviceName + " methodName: "
225      + ((this.md != null) ? this.md.getName() : "n/a") + " size: "
226      + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + " connection: "
227      + connection + " deadline: " + deadline;
228  }
229
230  @Override
231  public synchronized void setResponse(Message m, final CellScanner cells, Throwable t,
232    String errorMsg) {
233    if (this.isError) {
234      return;
235    }
236    if (t != null) {
237      this.isError = true;
238      TraceUtil.setError(span, t);
239    } else {
240      span.setStatus(StatusCode.OK);
241    }
242    BufferChain bc = null;
243    try {
244      ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
245      // Call id.
246      headerBuilder.setCallId(this.id);
247      if (t != null) {
248        setExceptionResponse(t, errorMsg, headerBuilder);
249      }
250      // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
251      // reservoir when finished. This is hacky and the hack is not contained but benefits are
252      // high when we can avoid a big buffer allocation on each rpc.
253      List<ByteBuffer> cellBlock = null;
254      int cellBlockSize = 0;
255      if (bbAllocator.isReservoirEnabled()) {
256        this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
257          this.connection.compressionCodec, cells, bbAllocator);
258        if (this.cellBlockStream != null) {
259          cellBlock = this.cellBlockStream.getByteBuffers();
260          cellBlockSize = this.cellBlockStream.size();
261        }
262      } else {
263        ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec,
264          this.connection.compressionCodec, cells);
265        if (b != null) {
266          cellBlockSize = b.remaining();
267          cellBlock = new ArrayList<>(1);
268          cellBlock.add(b);
269        }
270      }
271
272      if (cellBlockSize > 0) {
273        CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
274        // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
275        cellBlockBuilder.setLength(cellBlockSize);
276        headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
277      }
278      Message header = headerBuilder.build();
279      ByteBuffer headerBuf = createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
280      ByteBuffer[] responseBufs = null;
281      int cellBlockBufferSize = 0;
282      if (cellBlock != null) {
283        cellBlockBufferSize = cellBlock.size();
284        responseBufs = new ByteBuffer[1 + cellBlockBufferSize];
285      } else {
286        responseBufs = new ByteBuffer[1];
287      }
288      responseBufs[0] = headerBuf;
289      if (cellBlock != null) {
290        for (int i = 0; i < cellBlockBufferSize; i++) {
291          responseBufs[i + 1] = cellBlock.get(i);
292        }
293      }
294      bc = new BufferChain(responseBufs);
295    } catch (IOException e) {
296      RpcServer.LOG.warn("Exception while creating response " + e);
297    }
298    this.response = bc;
299    // Once a response message is created and set to this.response, this Call can be treated as
300    // done. The Responder thread will do the n/w write of this message back to client.
301    if (this.rpcCallback != null) {
302      try (Scope ignored = span.makeCurrent()) {
303        this.rpcCallback.run();
304      } catch (Exception e) {
305        // Don't allow any exception here to kill this handler thread.
306        RpcServer.LOG.warn("Exception while running the Rpc Callback.", e);
307        TraceUtil.setError(span, e);
308      }
309    }
310  }
311
312  static void setExceptionResponse(Throwable t, String errorMsg,
313    ResponseHeader.Builder headerBuilder) {
314    ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
315    exceptionBuilder.setExceptionClassName(t.getClass().getName());
316    exceptionBuilder.setStackTrace(errorMsg);
317    exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
318    if (t instanceof RegionMovedException) {
319      // Special casing for this exception. This is only one carrying a payload.
320      // Do this instead of build a generic system for allowing exceptions carry
321      // any kind of payload.
322      RegionMovedException rme = (RegionMovedException) t;
323      exceptionBuilder.setHostname(rme.getHostname());
324      exceptionBuilder.setPort(rme.getPort());
325    } else if (t instanceof HBaseServerException) {
326      HBaseServerException hse = (HBaseServerException) t;
327      exceptionBuilder.setServerOverloaded(hse.isServerOverloaded());
328    }
329    // Set the exception as the result of the method invocation.
330    headerBuilder.setException(exceptionBuilder.build());
331  }
332
333  static ByteBuffer createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize,
334    List<ByteBuffer> cellBlock) throws IOException {
335    // Organize the response as a set of bytebuffers rather than collect it all together inside
336    // one big byte array; save on allocations.
337    // for writing the header, we check if there is available space in the buffers
338    // created for the cellblock itself. If there is space for the header, we reuse
339    // the last buffer in the cellblock. This applies to the cellblock created from the
340    // pool or even the onheap cellblock buffer in case there is no pool enabled.
341    // Possible reuse would avoid creating a temporary array for storing the header every time.
342    ByteBuffer possiblePBBuf = (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
343    int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, resultVintSize = 0;
344    if (header != null) {
345      headerSerializedSize = header.getSerializedSize();
346      headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize);
347    }
348    if (result != null) {
349      resultSerializedSize = result.getSerializedSize();
350      resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize);
351    }
352    // calculate the total size
353    int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize)
354      + cellBlockSize;
355    int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + resultVintSize
356      + Bytes.SIZEOF_INT;
357    // Only if the last buffer has enough space for header use it. Else allocate
358    // a new buffer. Assume they are all flipped
359    if (possiblePBBuf != null && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
360      // duplicate the buffer. This is where the header is going to be written
361      ByteBuffer pbBuf = possiblePBBuf.duplicate();
362      // get the current limit
363      int limit = pbBuf.limit();
364      // Position such that we write the header to the end of the buffer
365      pbBuf.position(limit);
366      // limit to the header size
367      pbBuf.limit(totalPBSize + limit);
368      // mark the current position
369      pbBuf.mark();
370      writeToCOS(result, header, totalSize, pbBuf);
371      // reset the buffer back to old position
372      pbBuf.reset();
373      return pbBuf;
374    } else {
375      return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
376    }
377  }
378
379  private static void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
380    throws IOException {
381    ByteBufferUtils.putInt(pbBuf, totalSize);
382    // create COS that works on BB
383    CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
384    if (header != null) {
385      cos.writeMessageNoTag(header);
386    }
387    if (result != null) {
388      cos.writeMessageNoTag(result);
389    }
390    cos.flush();
391    cos.checkNoSpaceLeft();
392  }
393
394  private static ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
395    int totalSize, int totalPBSize) throws IOException {
396    ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
397    writeToCOS(result, header, totalSize, pbBuf);
398    pbBuf.flip();
399    return pbBuf;
400  }
401
402  protected BufferChain wrapWithSasl(BufferChain bc) throws IOException {
403    if (!this.connection.useSasl) {
404      return bc;
405    }
406    // Looks like no way around this; saslserver wants a byte array. I have to make it one.
407    // THIS IS A BIG UGLY COPY.
408    byte[] responseBytes = bc.getBytes();
409    byte[] token;
410    // synchronization may be needed since there can be multiple Handler
411    // threads using saslServer or Crypto AES to wrap responses.
412    if (connection.useCryptoAesWrap) {
413      // wrap with Crypto AES
414      synchronized (connection.cryptoAES) {
415        token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length);
416      }
417    } else {
418      synchronized (connection.saslServer) {
419        token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
420      }
421    }
422    if (RpcServer.LOG.isTraceEnabled()) {
423      RpcServer.LOG
424        .trace("Adding saslServer wrapped token of size " + token.length + " as call response.");
425    }
426
427    ByteBuffer[] responseBufs = new ByteBuffer[2];
428    responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length));
429    responseBufs[1] = ByteBuffer.wrap(token);
430    return new BufferChain(responseBufs);
431  }
432
433  @Override
434  public long disconnectSince() {
435    if (!this.connection.isConnectionOpen()) {
436      return EnvironmentEdgeManager.currentTime() - receiveTime;
437    } else {
438      return -1L;
439    }
440  }
441
442  @Override
443  public boolean isClientCellBlockSupported() {
444    return this.connection != null && this.connection.codec != null;
445  }
446
447  @Override
448  public long getResponseCellSize() {
449    return responseCellSize;
450  }
451
452  @Override
453  public void incrementResponseCellSize(long cellSize) {
454    responseCellSize += cellSize;
455  }
456
457  @Override
458  public long getResponseBlockSize() {
459    return responseBlockSize;
460  }
461
462  @Override
463  public void incrementResponseBlockSize(long blockSize) {
464    responseBlockSize += blockSize;
465  }
466
467  @Override
468  public long getResponseExceptionSize() {
469    return exceptionSize;
470  }
471
472  @Override
473  public void incrementResponseExceptionSize(long exSize) {
474    exceptionSize += exSize;
475  }
476
477  @Override
478  public long getSize() {
479    return this.size;
480  }
481
482  @Override
483  public long getDeadline() {
484    return deadline;
485  }
486
487  @Override
488  public Optional<User> getRequestUser() {
489    return Optional.ofNullable(user);
490  }
491
492  @Override
493  public InetAddress getRemoteAddress() {
494    return remoteAddress;
495  }
496
497  @Override
498  public VersionInfo getClientVersionInfo() {
499    return connection.getVersionInfo();
500  }
501
502  @Override
503  public synchronized void setCallBack(RpcCallback callback) {
504    this.rpcCallback = callback;
505  }
506
507  @Override
508  public boolean isRetryImmediatelySupported() {
509    return retryImmediatelySupported;
510  }
511
512  @Override
513  public BlockingService getService() {
514    return service;
515  }
516
517  @Override
518  public MethodDescriptor getMethod() {
519    return md;
520  }
521
522  @Override
523  public Message getParam() {
524    return param;
525  }
526
527  @Override
528  public CellScanner getCellScanner() {
529    return cellScanner;
530  }
531
532  @Override
533  public long getReceiveTime() {
534    return receiveTime;
535  }
536
537  @Override
538  public long getStartTime() {
539    return startTime;
540  }
541
542  @Override
543  public void setStartTime(long t) {
544    this.startTime = t;
545  }
546
547  @Override
548  public int getTimeout() {
549    return timeout;
550  }
551
552  @Override
553  public int getRemotePort() {
554    return connection.getRemotePort();
555  }
556
557  @Override
558  public synchronized BufferChain getResponse() {
559    if (connection.useWrap) {
560      /*
561       * wrapping result with SASL as the last step just before sending it out, so every message
562       * must have the right increasing sequence number
563       */
564      try {
565        return wrapWithSasl(response);
566      } catch (IOException e) {
567        /* it is exactly the same what setResponse() does */
568        RpcServer.LOG.warn("Exception while creating response " + e);
569        return null;
570      }
571    } else {
572      return response;
573    }
574  }
575
576  @RestrictedApi(explanation = "Should only be called in tests", link = "",
577      allowedOnPath = ".*/src/test/.*")
578  public synchronized RpcCallback getCallBack() {
579    return this.rpcCallback;
580  }
581}