001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.client;
021
022import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
026
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Optional;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.TimeUnit;
032import java.util.function.Consumer;
033import java.util.function.Supplier;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.NotServingRegionException;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.TableNotEnabledException;
038import org.apache.hadoop.hbase.TableNotFoundException;
039import org.apache.hadoop.hbase.exceptions.ScannerResetException;
040import org.apache.hadoop.hbase.ipc.HBaseRpcController;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.hadoop.hbase.util.FutureUtils;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.io.netty.util.Timer;
048
049@InterfaceAudience.Private
050public abstract class AsyncRpcRetryingCaller<T> {
051
052  private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCaller.class);
053
054  private final Timer retryTimer;
055
056  private final long startNs;
057
058  private final long pauseNs;
059
060  private int tries = 1;
061
062  private final int maxAttempts;
063
064  private final int startLogErrorsCnt;
065
066  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
067
068  private final long rpcTimeoutNs;
069
070  protected final long operationTimeoutNs;
071
072  protected final AsyncConnectionImpl conn;
073
074  protected final CompletableFuture<T> future;
075
076  protected final HBaseRpcController controller;
077
078  public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs,
079      int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
080    this.retryTimer = retryTimer;
081    this.conn = conn;
082    this.pauseNs = pauseNs;
083    this.maxAttempts = maxAttempts;
084    this.operationTimeoutNs = operationTimeoutNs;
085    this.rpcTimeoutNs = rpcTimeoutNs;
086    this.startLogErrorsCnt = startLogErrorsCnt;
087    this.future = new CompletableFuture<>();
088    this.controller = conn.rpcControllerFactory.newController();
089    this.exceptions = new ArrayList<>();
090    this.startNs = System.nanoTime();
091  }
092
093  private long elapsedMs() {
094    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
095  }
096
097  protected final long remainingTimeNs() {
098    return operationTimeoutNs - (System.nanoTime() - startNs);
099  }
100
101  protected final void completeExceptionally() {
102    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
103  }
104
105  protected final void resetCallTimeout() {
106    long callTimeoutNs;
107    if (operationTimeoutNs > 0) {
108      callTimeoutNs = remainingTimeNs();
109      if (callTimeoutNs <= 0) {
110        completeExceptionally();
111        return;
112      }
113      callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs);
114    } else {
115      callTimeoutNs = rpcTimeoutNs;
116    }
117    resetController(controller, callTimeoutNs);
118  }
119
120  private void tryScheduleRetry(Throwable error) {
121    long delayNs;
122    if (operationTimeoutNs > 0) {
123      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
124      if (maxDelayNs <= 0) {
125        completeExceptionally();
126        return;
127      }
128      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
129    } else {
130      delayNs = getPauseTime(pauseNs, tries - 1);
131    }
132    tries++;
133    retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
134  }
135
136  protected Optional<TableName> getTableName() {
137    return Optional.empty();
138  }
139
140  protected final void onError(Throwable t, Supplier<String> errMsg,
141      Consumer<Throwable> updateCachedLocation) {
142    if (future.isDone()) {
143      // Give up if the future is already done, this is possible if user has already canceled the
144      // future. And for timeline consistent read, we will also cancel some requests if we have
145      // already get one of the responses.
146      LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
147      return;
148    }
149    Throwable error = translateException(t);
150    // We use this retrying caller to open a scanner, as it is idempotent, but we may throw
151    // ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will
152    // also fetch data when opening a scanner. The intention here is that if we hit a
153    // ScannerResetException when scanning then we should try to open a new scanner, instead of
154    // retrying on the old one, so it is declared as a DoNotRetryIOException. But here we are
155    // exactly trying to open a new scanner, so we should retry on ScannerResetException.
156    if (error instanceof DoNotRetryIOException && !(error instanceof ScannerResetException)) {
157      future.completeExceptionally(error);
158      return;
159    }
160    if (tries > startLogErrorsCnt) {
161      LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts +
162        ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) +
163        " ms, time elapsed = " + elapsedMs() + " ms", error);
164    }
165    updateCachedLocation.accept(error);
166    RetriesExhaustedException.ThrowableWithExtraContext qt =
167      new RetriesExhaustedException.ThrowableWithExtraContext(error,
168        EnvironmentEdgeManager.currentTime(), "");
169    exceptions.add(qt);
170    if (tries >= maxAttempts) {
171      completeExceptionally();
172      return;
173    }
174    // check whether the table has been disabled, notice that the check will introduce a request to
175    // meta, so here we only check for disabled for some specific exception types.
176    if (error instanceof NotServingRegionException || error instanceof RegionOfflineException) {
177      Optional<TableName> tableName = getTableName();
178      if (tableName.isPresent()) {
179        FutureUtils.addListener(conn.getAdmin().isTableDisabled(tableName.get()), (disabled, e) -> {
180          if (e != null) {
181            if (e instanceof TableNotFoundException) {
182              future.completeExceptionally(e);
183            } else {
184              // failed to test whether the table is disabled, not a big deal, continue retrying
185              tryScheduleRetry(error);
186            }
187            return;
188          }
189          if (disabled) {
190            future.completeExceptionally(new TableNotEnabledException(tableName.get()));
191          } else {
192            tryScheduleRetry(error);
193          }
194        });
195      } else {
196        tryScheduleRetry(error);
197      }
198    } else {
199      tryScheduleRetry(error);
200    }
201  }
202
203  protected abstract void doCall();
204
205  CompletableFuture<T> call() {
206    doCall();
207    return future;
208  }
209}