001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to you under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.hadoop.hbase.client;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021
022import org.apache.hadoop.hbase.DoNotRetryIOException;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.TableName;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
027import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
028
029/**
030 * This class is used to unify HTable calls with AsyncProcess Framework. HTable can use
031 * AsyncProcess directly though this class. Also adds global timeout tracking on top of
032 * RegionServerCallable and implements Cancellable.
033 * Global timeout tracking conflicts with logic in RpcRetryingCallerImpl's callWithRetries. So you
034 * can only use this callable in AsyncProcess which only uses callWithoutRetries and retries in its
035 * own implementation.
036 */
037@InterfaceAudience.Private
038abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<T> implements
039    Cancellable {
040  private final RetryingTimeTracker tracker;
041  private final int rpcTimeout;
042  CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
043      RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker, int priority) {
044    super(connection, tableName, row, rpcController, priority);
045    this.rpcTimeout = rpcTimeout;
046    this.tracker = tracker;
047  }
048
049  /* Override so can mess with the callTimeout.
050   * (non-Javadoc)
051   * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int)
052   */
053  @Override
054  public T call(int operationTimeout) throws IOException {
055    if (isCancelled()) return null;
056    if (Thread.interrupted()) {
057      throw new InterruptedIOException();
058    }
059    // It is expected (it seems) that tracker.start can be called multiple times (on each trip
060    // through the call when retrying). Also, we can call start and no need of a stop.
061    this.tracker.start();
062    int remainingTime = tracker.getRemainingTime(operationTimeout);
063    if (remainingTime <= 1) {
064      // "1" is a special return value in RetryingTimeTracker, see its implementation.
065      throw new DoNotRetryIOException("Operation rpcTimeout");
066    }
067    return super.call(Math.min(rpcTimeout, remainingTime));
068  }
069
070  @Override
071  public void prepare(boolean reload) throws IOException {
072    if (isCancelled()) return;
073    if (Thread.interrupted()) {
074      throw new InterruptedIOException();
075    }
076    super.prepare(reload);
077  }
078
079  @Override
080  protected void setStubByServiceName(ServerName serviceName) throws IOException {
081    setStub(getConnection().getClient(serviceName));
082  }
083
084  @Override
085  public void cancel() {
086    getRpcController().startCancel();
087  }
088
089  @Override
090  public boolean isCancelled() {
091    return getRpcController().isCanceled();
092  }
093
094  protected ClientProtos.MultiResponse doMulti(ClientProtos.MultiRequest request)
095  throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
096    return getStub().multi(getRpcController(), request);
097  }
098
099  protected ClientProtos.ScanResponse doScan(ClientProtos.ScanRequest request)
100  throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
101    return getStub().scan(getRpcController(), request);
102  }
103
104  protected ClientProtos.PrepareBulkLoadResponse doPrepareBulkLoad(
105      ClientProtos.PrepareBulkLoadRequest request)
106  throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
107    return getStub().prepareBulkLoad(getRpcController(), request);
108  }
109
110  protected ClientProtos.BulkLoadHFileResponse doBulkLoadHFile(
111      ClientProtos.BulkLoadHFileRequest request)
112  throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
113    return getStub().bulkLoadHFile(getRpcController(), request);
114  }
115
116  protected ClientProtos.CleanupBulkLoadResponse doCleanupBulkLoad(
117      ClientProtos.CleanupBulkLoadRequest request)
118  throws org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
119    return getStub().cleanupBulkLoad(getRpcController(), request);
120  }
121}