001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.InetAddress;
022import java.nio.ByteBuffer;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Optional;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.hbase.CellScanner;
028import org.apache.hadoop.hbase.DoNotRetryIOException;
029import org.apache.hadoop.hbase.io.ByteBuffAllocator;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.apache.hadoop.hbase.exceptions.RegionMovedException;
032import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
033import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
034import org.apache.hadoop.hbase.security.User;
035import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
036import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
037import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
038import org.apache.hbase.thirdparty.com.google.protobuf.Message;
039import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
045import org.apache.hadoop.hbase.util.ByteBufferUtils;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.util.StringUtils;
048
049/**
050 * Datastructure that holds all necessary to a method invocation and then afterward, carries
051 * the result.
052 */
053@InterfaceAudience.Private
054public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
055
056  protected final int id;                             // the client's call id
057  protected final BlockingService service;
058  protected final MethodDescriptor md;
059  protected final RequestHeader header;
060  protected Message param;                      // the parameter passed
061  // Optional cell data passed outside of protobufs.
062  protected final CellScanner cellScanner;
063  protected final T connection;              // connection to client
064  protected final long receiveTime;      // the time received when response is null
065                                 // the time served when response is not null
066  protected final int timeout;
067  protected long startTime;
068  protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
069
070  protected final ByteBuffAllocator bbAllocator;
071
072  protected final CellBlockBuilder cellBlockBuilder;
073
074  /**
075   * Chain of buffers to send as response.
076   */
077  protected BufferChain response;
078
079  protected final long size;                          // size of current call
080  protected boolean isError;
081  protected ByteBufferListOutputStream cellBlockStream = null;
082  protected CallCleanup reqCleanup = null;
083
084  protected final User user;
085  protected final InetAddress remoteAddress;
086  protected RpcCallback rpcCallback;
087
088  private long responseCellSize = 0;
089  private long responseBlockSize = 0;
090  // cumulative size of serialized exceptions
091  private long exceptionSize = 0;
092  private final boolean retryImmediatelySupported;
093
094  // This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the
095  // second bit is for WAL reference. We can only call release if both of them are zero. The reason
096  // why we can not use a general reference counting is that, we may call cleanup multiple times in
097  // the current implementation. We should fix this in the future.
098  private final AtomicInteger reference = new AtomicInteger(0b01);
099
100  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
101      justification = "Can't figure why this complaint is happening... see below")
102  ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
103      Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress,
104      long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator,
105      CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
106    this.id = id;
107    this.service = service;
108    this.md = md;
109    this.header = header;
110    this.param = param;
111    this.cellScanner = cellScanner;
112    this.connection = connection;
113    this.receiveTime = receiveTime;
114    this.response = null;
115    this.isError = false;
116    this.size = size;
117    if (connection != null) {
118      this.user =  connection.user;
119      this.retryImmediatelySupported = connection.retryImmediatelySupported;
120    } else {
121      this.user = null;
122      this.retryImmediatelySupported = false;
123    }
124    this.remoteAddress = remoteAddress;
125    this.timeout = timeout;
126    this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
127    this.bbAllocator = byteBuffAllocator;
128    this.cellBlockBuilder = cellBlockBuilder;
129    this.reqCleanup = reqCleanup;
130  }
131
132  /**
133   * Call is done. Execution happened and we returned results to client. It is
134   * now safe to cleanup.
135   */
136  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
137      justification = "Presume the lock on processing request held by caller is protection enough")
138  @Override
139  public void done() {
140    if (this.cellBlockStream != null) {
141      // This will return back the BBs which we got from pool.
142      this.cellBlockStream.releaseResources();
143      this.cellBlockStream = null;
144    }
145    // If the call was run successfuly, we might have already returned the BB
146    // back to pool. No worries..Then inputCellBlock will be null
147    cleanup();
148  }
149
150  private void release(int mask) {
151    for (;;) {
152      int ref = reference.get();
153      if ((ref & mask) == 0) {
154        return;
155      }
156      int nextRef = ref & (~mask);
157      if (reference.compareAndSet(ref, nextRef)) {
158        if (nextRef == 0) {
159          if (this.reqCleanup != null) {
160            this.reqCleanup.run();
161          }
162        }
163        return;
164      }
165    }
166  }
167
168  @Override
169  public void cleanup() {
170    release(0b01);
171  }
172
173  public void retainByWAL() {
174    for (;;) {
175      int ref = reference.get();
176      int nextRef = ref | 0b10;
177      if (reference.compareAndSet(ref, nextRef)) {
178        return;
179      }
180    }
181  }
182
183  public void releaseByWAL() {
184    release(0b10);
185  }
186
187  @Override
188  public String toString() {
189    return toShortString() + " param: " +
190      (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
191      " connection: " + connection.toString();
192  }
193
194  @Override
195  public RequestHeader getHeader() {
196    return this.header;
197  }
198
199  @Override
200  public int getPriority() {
201    return this.header.getPriority();
202  }
203
204  /*
205   * Short string representation without param info because param itself could be huge depends on
206   * the payload of a command
207   */
208  @Override
209  public String toShortString() {
210    String serviceName = this.connection.service != null ?
211        this.connection.service.getDescriptorForType().getName() : "null";
212    return "callId: " + this.id + " service: " + serviceName +
213        " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
214        " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
215        " connection: " + connection.toString() +
216        " deadline: " + deadline;
217  }
218
219  @Override
220  public synchronized void setResponse(Message m, final CellScanner cells,
221      Throwable t, String errorMsg) {
222    if (this.isError) return;
223    if (t != null) this.isError = true;
224    BufferChain bc = null;
225    try {
226      ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
227      // Call id.
228      headerBuilder.setCallId(this.id);
229      if (t != null) {
230        setExceptionResponse(t, errorMsg, headerBuilder);
231      }
232      // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
233      // reservoir when finished. This is hacky and the hack is not contained but benefits are
234      // high when we can avoid a big buffer allocation on each rpc.
235      List<ByteBuffer> cellBlock = null;
236      int cellBlockSize = 0;
237      if (bbAllocator.isReservoirEnabled()) {
238        this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
239          this.connection.compressionCodec, cells, bbAllocator);
240        if (this.cellBlockStream != null) {
241          cellBlock = this.cellBlockStream.getByteBuffers();
242          cellBlockSize = this.cellBlockStream.size();
243        }
244      } else {
245        ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec,
246          this.connection.compressionCodec, cells);
247        if (b != null) {
248          cellBlockSize = b.remaining();
249          cellBlock = new ArrayList<>(1);
250          cellBlock.add(b);
251        }
252      }
253
254      if (cellBlockSize > 0) {
255        CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
256        // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
257        cellBlockBuilder.setLength(cellBlockSize);
258        headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
259      }
260      Message header = headerBuilder.build();
261      ByteBuffer headerBuf =
262          createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
263      ByteBuffer[] responseBufs = null;
264      int cellBlockBufferSize = 0;
265      if (cellBlock != null) {
266        cellBlockBufferSize = cellBlock.size();
267        responseBufs = new ByteBuffer[1 + cellBlockBufferSize];
268      } else {
269        responseBufs = new ByteBuffer[1];
270      }
271      responseBufs[0] = headerBuf;
272      if (cellBlock != null) {
273        for (int i = 0; i < cellBlockBufferSize; i++) {
274          responseBufs[i + 1] = cellBlock.get(i);
275        }
276      }
277      bc = new BufferChain(responseBufs);
278      if (connection.useWrap) {
279        bc = wrapWithSasl(bc);
280      }
281    } catch (IOException e) {
282      RpcServer.LOG.warn("Exception while creating response " + e);
283    }
284    this.response = bc;
285    // Once a response message is created and set to this.response, this Call can be treated as
286    // done. The Responder thread will do the n/w write of this message back to client.
287    if (this.rpcCallback != null) {
288      try {
289        this.rpcCallback.run();
290      } catch (Exception e) {
291        // Don't allow any exception here to kill this handler thread.
292        RpcServer.LOG.warn("Exception while running the Rpc Callback.", e);
293      }
294    }
295  }
296
297  static void setExceptionResponse(Throwable t, String errorMsg,
298      ResponseHeader.Builder headerBuilder) {
299    ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
300    exceptionBuilder.setExceptionClassName(t.getClass().getName());
301    exceptionBuilder.setStackTrace(errorMsg);
302    exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
303    if (t instanceof RegionMovedException) {
304      // Special casing for this exception.  This is only one carrying a payload.
305      // Do this instead of build a generic system for allowing exceptions carry
306      // any kind of payload.
307      RegionMovedException rme = (RegionMovedException)t;
308      exceptionBuilder.setHostname(rme.getHostname());
309      exceptionBuilder.setPort(rme.getPort());
310    }
311    // Set the exception as the result of the method invocation.
312    headerBuilder.setException(exceptionBuilder.build());
313  }
314
315  static ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
316      int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
317    // Organize the response as a set of bytebuffers rather than collect it all together inside
318    // one big byte array; save on allocations.
319    // for writing the header, we check if there is available space in the buffers
320    // created for the cellblock itself. If there is space for the header, we reuse
321    // the last buffer in the cellblock. This applies to the cellblock created from the
322    // pool or even the onheap cellblock buffer in case there is no pool enabled.
323    // Possible reuse would avoid creating a temporary array for storing the header every time.
324    ByteBuffer possiblePBBuf =
325        (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
326    int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
327        resultVintSize = 0;
328    if (header != null) {
329      headerSerializedSize = header.getSerializedSize();
330      headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize);
331    }
332    if (result != null) {
333      resultSerializedSize = result.getSerializedSize();
334      resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize);
335    }
336    // calculate the total size
337    int totalSize = headerSerializedSize + headerVintSize
338        + (resultSerializedSize + resultVintSize)
339        + cellBlockSize;
340    int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize
341        + resultVintSize + Bytes.SIZEOF_INT;
342    // Only if the last buffer has enough space for header use it. Else allocate
343    // a new buffer. Assume they are all flipped
344    if (possiblePBBuf != null
345        && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
346      // duplicate the buffer. This is where the header is going to be written
347      ByteBuffer pbBuf = possiblePBBuf.duplicate();
348      // get the current limit
349      int limit = pbBuf.limit();
350      // Position such that we write the header to the end of the buffer
351      pbBuf.position(limit);
352      // limit to the header size
353      pbBuf.limit(totalPBSize + limit);
354      // mark the current position
355      pbBuf.mark();
356      writeToCOS(result, header, totalSize, pbBuf);
357      // reset the buffer back to old position
358      pbBuf.reset();
359      return pbBuf;
360    } else {
361      return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
362    }
363  }
364
365  private static void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
366      throws IOException {
367    ByteBufferUtils.putInt(pbBuf, totalSize);
368    // create COS that works on BB
369    CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
370    if (header != null) {
371      cos.writeMessageNoTag(header);
372    }
373    if (result != null) {
374      cos.writeMessageNoTag(result);
375    }
376    cos.flush();
377    cos.checkNoSpaceLeft();
378  }
379
380  private static ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
381      int totalSize, int totalPBSize) throws IOException {
382    ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
383    writeToCOS(result, header, totalSize, pbBuf);
384    pbBuf.flip();
385    return pbBuf;
386  }
387
388  protected BufferChain wrapWithSasl(BufferChain bc)
389      throws IOException {
390    if (!this.connection.useSasl) return bc;
391    // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
392    // THIS IS A BIG UGLY COPY.
393    byte [] responseBytes = bc.getBytes();
394    byte [] token;
395    // synchronization may be needed since there can be multiple Handler
396    // threads using saslServer or Crypto AES to wrap responses.
397    if (connection.useCryptoAesWrap) {
398      // wrap with Crypto AES
399      synchronized (connection.cryptoAES) {
400        token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length);
401      }
402    } else {
403      synchronized (connection.saslServer) {
404        token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
405      }
406    }
407    if (RpcServer.LOG.isTraceEnabled()) {
408      RpcServer.LOG.trace("Adding saslServer wrapped token of size " + token.length
409          + " as call response.");
410    }
411
412    ByteBuffer[] responseBufs = new ByteBuffer[2];
413    responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length));
414    responseBufs[1] = ByteBuffer.wrap(token);
415    return new BufferChain(responseBufs);
416  }
417
418  @Override
419  public long disconnectSince() {
420    if (!this.connection.isConnectionOpen()) {
421      return System.currentTimeMillis() - receiveTime;
422    } else {
423      return -1L;
424    }
425  }
426
427  @Override
428  public boolean isClientCellBlockSupported() {
429    return this.connection != null && this.connection.codec != null;
430  }
431
432  @Override
433  public long getResponseCellSize() {
434    return responseCellSize;
435  }
436
437  @Override
438  public void incrementResponseCellSize(long cellSize) {
439    responseCellSize += cellSize;
440  }
441
442  @Override
443  public long getResponseBlockSize() {
444    return responseBlockSize;
445  }
446
447  @Override
448  public void incrementResponseBlockSize(long blockSize) {
449    responseBlockSize += blockSize;
450  }
451
452  @Override
453  public long getResponseExceptionSize() {
454    return exceptionSize;
455  }
456  @Override
457  public void incrementResponseExceptionSize(long exSize) {
458    exceptionSize += exSize;
459  }
460
461  @Override
462  public long getSize() {
463    return this.size;
464  }
465
466  @Override
467  public long getDeadline() {
468    return deadline;
469  }
470
471  @Override
472  public Optional<User> getRequestUser() {
473    return Optional.ofNullable(user);
474  }
475
476  @Override
477  public InetAddress getRemoteAddress() {
478    return remoteAddress;
479  }
480
481  @Override
482  public VersionInfo getClientVersionInfo() {
483    return connection.getVersionInfo();
484  }
485
486  @Override
487  public synchronized void setCallBack(RpcCallback callback) {
488    this.rpcCallback = callback;
489  }
490
491  @Override
492  public boolean isRetryImmediatelySupported() {
493    return retryImmediatelySupported;
494  }
495
496  @Override
497  public BlockingService getService() {
498    return service;
499  }
500
501  @Override
502  public MethodDescriptor getMethod() {
503    return md;
504  }
505
506  @Override
507  public Message getParam() {
508    return param;
509  }
510
511  @Override
512  public CellScanner getCellScanner() {
513    return cellScanner;
514  }
515
516  @Override
517  public long getReceiveTime() {
518    return receiveTime;
519  }
520
521  @Override
522  public long getStartTime() {
523    return startTime;
524  }
525
526  @Override
527  public void setStartTime(long t) {
528    this.startTime = t;
529  }
530
531  @Override
532  public int getTimeout() {
533    return timeout;
534  }
535
536  @Override
537  public int getRemotePort() {
538    return connection.getRemotePort();
539  }
540
541  @Override
542  public synchronized BufferChain getResponse() {
543    return response;
544  }
545}