001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
029
030import io.opentelemetry.context.Context;
031import io.opentelemetry.context.Scope;
032import java.io.IOException;
033import java.util.ArrayList;
034import java.util.List;
035import java.util.Map;
036import java.util.Optional;
037import java.util.OptionalLong;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.TimeUnit;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.HBaseServerException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.HRegionLocation;
044import org.apache.hadoop.hbase.NotServingRegionException;
045import org.apache.hadoop.hbase.UnknownScannerException;
046import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
047import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
048import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
049import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
050import org.apache.hadoop.hbase.exceptions.ScannerResetException;
051import org.apache.hadoop.hbase.ipc.HBaseRpcController;
052import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
059import org.apache.hbase.thirdparty.io.netty.util.Timeout;
060import org.apache.hbase.thirdparty.io.netty.util.Timer;
061
062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
063import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
064import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
069
070/**
071 * Retry caller for scanning a region.
072 * <p>
073 * We will modify the {@link Scan} object passed in directly. The upper layer should store the
074 * reference of this object and use it to open new single region scanners.
075 */
076@InterfaceAudience.Private
077class AsyncScanSingleRegionRpcRetryingCaller {
078
079  private static final Logger LOG =
080    LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
081
082  private final Timer retryTimer;
083
084  private final Scan scan;
085
086  private final ScanMetrics scanMetrics;
087
088  private final long scannerId;
089
090  private final ScanResultCache resultCache;
091
092  private final AdvancedScanResultConsumer consumer;
093
094  private final ClientService.Interface stub;
095
096  private final HRegionLocation loc;
097
098  private final boolean regionServerRemote;
099
100  private final int priority;
101
102  private final long scannerLeaseTimeoutPeriodNs;
103
104  private final int maxAttempts;
105
106  private final long scanTimeoutNs;
107
108  private final long rpcTimeoutNs;
109
110  private final int startLogErrorsCnt;
111
112  private final Runnable completeWhenNoMoreResultsInRegion;
113
114  private final AsyncConnectionImpl conn;
115
116  private final CompletableFuture<Boolean> future;
117
118  private final HBaseRpcController controller;
119
120  private byte[] nextStartRowWhenError;
121
122  private boolean includeNextStartRowWhenError;
123
124  private long lastNextCallNanos;
125
126  private long nextCallStartNanos;
127
128  private int tries;
129
130  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
131
132  private long nextCallSeq = -1L;
133
134  private final HBaseServerExceptionPauseManager pauseManager;
135
136  private enum ScanControllerState {
137    INITIALIZED,
138    SUSPENDED,
139    TERMINATED,
140    DESTROYED
141  }
142
143  // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
144  // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
145  // usage. We use two things to prevent invalid usage:
146  // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
147  // IllegalStateException if the caller thread is not this thread.
148  // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
149  // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
150  // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
151  // call destroy to get the current state and set the state to DESTROYED. And when user calls
152  // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
153  // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
154  // suspend or terminate so the state will still be INITIALIZED when back from onNext or
155  // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
156  // to be used in the future.
157  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
158  // package private methods can only be called within the implementation of
159  // AsyncScanSingleRegionRpcRetryingCaller.
160  private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
161
162    // Make sure the methods are only called in this thread.
163    private final Thread callerThread;
164
165    private final Optional<Cursor> cursor;
166
167    // INITIALIZED -> SUSPENDED -> DESTROYED
168    // INITIALIZED -> TERMINATED -> DESTROYED
169    // INITIALIZED -> DESTROYED
170    // If the state is incorrect we will throw IllegalStateException.
171    private ScanControllerState state = ScanControllerState.INITIALIZED;
172
173    private ScanResumerImpl resumer;
174
175    public ScanControllerImpl(Optional<Cursor> cursor) {
176      this.callerThread = Thread.currentThread();
177      this.cursor = cursor;
178    }
179
180    private void preCheck() {
181      Preconditions.checkState(Thread.currentThread() == callerThread,
182        "The current thread is %s, expected thread is %s, "
183          + "you should not call this method outside onNext or onHeartbeat",
184        Thread.currentThread(), callerThread);
185      Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
186        "Invalid Stopper state %s", state);
187    }
188
189    @Override
190    public ScanResumer suspend() {
191      preCheck();
192      state = ScanControllerState.SUSPENDED;
193      ScanResumerImpl resumer = new ScanResumerImpl();
194      this.resumer = resumer;
195      return resumer;
196    }
197
198    @Override
199    public void terminate() {
200      preCheck();
201      state = ScanControllerState.TERMINATED;
202    }
203
204    // return the current state, and set the state to DESTROYED.
205    ScanControllerState destroy() {
206      ScanControllerState state = this.state;
207      this.state = ScanControllerState.DESTROYED;
208      return state;
209    }
210
211    @Override
212    public Optional<Cursor> cursor() {
213      return cursor;
214    }
215  }
216
217  private enum ScanResumerState {
218    INITIALIZED,
219    SUSPENDED,
220    RESUMED
221  }
222
223  // The resume method is allowed to be called in another thread so here we also use the
224  // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
225  // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
226  // and when user calls resume method, we will change the state to RESUMED. But the resume method
227  // could be called in other thread, and in fact, user could just do this:
228  // controller.suspend().resume()
229  // This is strange but valid. This means the scan could be resumed before we call the prepare
230  // method to do the actual suspend work. So in the resume method, we will check if the state is
231  // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
232  // method, if the state is RESUMED already, we will just return an let the scan go on.
233  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
234  // package private methods can only be called within the implementation of
235  // AsyncScanSingleRegionRpcRetryingCaller.
236  @InterfaceAudience.Private
237  final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
238
239    // INITIALIZED -> SUSPENDED -> RESUMED
240    // INITIALIZED -> RESUMED
241    private ScanResumerState state = ScanResumerState.INITIALIZED;
242
243    private ScanResponse resp;
244
245    private int numberOfCompleteRows;
246
247    // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
248    // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
249    // every time when the previous task is finished. There could also be race as the renewal is
250    // executed in the timer thread, so we also need to check the state before lease renewal. If the
251    // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
252    // renewal task.
253    private Timeout leaseRenewer;
254
255    @Override
256    public void resume() {
257      doResume(false);
258    }
259
260    /**
261     * This method is used when {@link ScanControllerImpl#suspend} had ever been called to get a
262     * {@link ScanResumerImpl}, but now user stops scan and does not need any more scan results.
263     */
264    public void terminate() {
265      doResume(true);
266    }
267
268    private void doResume(boolean stopScan) {
269      // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
270      // just return at the first if condition without loading the resp and numValidResuls field. If
271      // resume is called after suspend, then it is also safe to just reference resp and
272      // numValidResults after the synchronized block as no one will change it anymore.
273      ScanResponse localResp;
274      int localNumberOfCompleteRows;
275      synchronized (this) {
276        if (state == ScanResumerState.INITIALIZED) {
277          // user calls this method before we call prepare, so just set the state to
278          // RESUMED, the implementation will just go on.
279          state = ScanResumerState.RESUMED;
280          return;
281        }
282        if (state == ScanResumerState.RESUMED) {
283          // already resumed, give up.
284          return;
285        }
286        state = ScanResumerState.RESUMED;
287        if (leaseRenewer != null) {
288          leaseRenewer.cancel();
289        }
290        localResp = this.resp;
291        localNumberOfCompleteRows = this.numberOfCompleteRows;
292      }
293      if (stopScan) {
294        stopScan(localResp);
295      } else {
296        completeOrNext(localResp, localNumberOfCompleteRows);
297      }
298    }
299
300    private void scheduleRenewLeaseTask() {
301      leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
302        TimeUnit.NANOSECONDS);
303    }
304
305    private synchronized void tryRenewLease() {
306      // the scan has already been resumed, give up
307      if (state == ScanResumerState.RESUMED) {
308        return;
309      }
310      renewLease();
311      // schedule the next renew lease task again as this is a one-time task.
312      scheduleRenewLeaseTask();
313    }
314
315    // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
316    // for more details.
317    synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) {
318      if (state == ScanResumerState.RESUMED) {
319        // user calls resume before we actually suspend the scan, just continue;
320        return false;
321      }
322      state = ScanResumerState.SUSPENDED;
323      this.resp = resp;
324      this.numberOfCompleteRows = numberOfCompleteRows;
325      // if there are no more results in region then the scanner at RS side will be closed
326      // automatically so we do not need to renew lease.
327      if (resp.getMoreResultsInRegion()) {
328        // schedule renew lease task
329        scheduleRenewLeaseTask();
330      }
331      return true;
332    }
333  }
334
335  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
336    Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
337    AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
338    boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
339    long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
340    long lastNextCallNanos, int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
341    this.retryTimer = retryTimer;
342    this.conn = conn;
343    this.scan = scan;
344    this.scanMetrics = scanMetrics;
345    this.scannerId = scannerId;
346    this.resultCache = resultCache;
347    this.consumer = consumer;
348    this.stub = stub;
349    this.loc = loc;
350    this.regionServerRemote = isRegionServerRemote;
351    this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
352    this.maxAttempts = maxAttempts;
353    this.scanTimeoutNs = scanTimeoutNs;
354    this.rpcTimeoutNs = rpcTimeoutNs;
355    this.lastNextCallNanos = lastNextCallNanos;
356    this.startLogErrorsCnt = startLogErrorsCnt;
357    if (scan.isReversed()) {
358      completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
359    } else {
360      completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
361    }
362    this.future = new CompletableFuture<>();
363    this.priority = priority;
364    this.controller = conn.rpcControllerFactory.newController();
365    this.controller.setRequestAttributes(requestAttributes);
366    this.exceptions = new ArrayList<>();
367    this.pauseManager =
368      new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
369  }
370
371  public long getLastNextCallNanos() {
372    return lastNextCallNanos;
373  }
374
375  private long elapsedMs() {
376    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNanos);
377  }
378
379  private void closeScanner() {
380    incRPCCallsMetrics(scanMetrics, regionServerRemote);
381    resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS, loc.getRegion().getTable());
382    ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
383    stub.scan(controller, req, resp -> {
384      if (controller.failed()) {
385        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
386          + " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
387          + " failed, ignore, probably already closed", controller.getFailed());
388      }
389    });
390  }
391
392  private void completeExceptionally(boolean closeScanner) {
393    resultCache.clear();
394    if (closeScanner) {
395      closeScanner();
396    }
397    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
398  }
399
400  private void completeNoMoreResults() {
401    future.complete(false);
402  }
403
404  private void completeWithNextStartRow(byte[] row, boolean inclusive) {
405    scan.withStartRow(row, inclusive);
406    future.complete(true);
407  }
408
409  private void completeWhenError(boolean closeScanner) {
410    incRPCRetriesMetrics(scanMetrics, closeScanner);
411    resultCache.clear();
412    if (closeScanner) {
413      closeScanner();
414    }
415    if (nextStartRowWhenError != null) {
416      scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError);
417    }
418    future.complete(true);
419  }
420
421  private void onError(Throwable error) {
422    error = translateException(error);
423    if (tries > startLogErrorsCnt) {
424      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
425        + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
426        + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
427        + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
428        + " ms", error);
429    }
430    boolean scannerClosed =
431      error instanceof UnknownScannerException || error instanceof NotServingRegionException
432        || error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
433    RetriesExhaustedException.ThrowableWithExtraContext qt =
434      new RetriesExhaustedException.ThrowableWithExtraContext(error,
435        EnvironmentEdgeManager.currentTime(), "");
436    exceptions.add(qt);
437    if (tries >= maxAttempts) {
438      completeExceptionally(!scannerClosed);
439      return;
440    }
441
442    OptionalLong maybePauseNsToUse =
443      pauseManager.getPauseNsFromException(error, tries, nextCallStartNanos);
444    if (!maybePauseNsToUse.isPresent()) {
445      completeExceptionally(!scannerClosed);
446      return;
447    }
448    long delayNs = maybePauseNsToUse.getAsLong();
449    if (scannerClosed) {
450      completeWhenError(false);
451      return;
452    }
453    if (error instanceof OutOfOrderScannerNextException) {
454      completeWhenError(true);
455      return;
456    }
457    if (error instanceof DoNotRetryIOException) {
458      completeExceptionally(true);
459      return;
460    }
461    tries++;
462
463    if (HBaseServerException.isServerOverloaded(error)) {
464      Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
465      metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
466    }
467    retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
468  }
469
470  private void updateNextStartRowWhenError(Result result) {
471    nextStartRowWhenError = result.getRow();
472    includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
473  }
474
475  private void completeWhenNoMoreResultsInRegion() {
476    if (noMoreResultsForScan(scan, loc.getRegion())) {
477      completeNoMoreResults();
478    } else {
479      completeWithNextStartRow(loc.getRegion().getEndKey(), true);
480    }
481  }
482
483  private void completeReversedWhenNoMoreResultsInRegion() {
484    if (noMoreResultsForReverseScan(scan, loc.getRegion())) {
485      completeNoMoreResults();
486    } else {
487      completeWithNextStartRow(loc.getRegion().getStartKey(), false);
488    }
489  }
490
491  private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) {
492    if (resp.hasMoreResults() && !resp.getMoreResults()) {
493      // RS tells us there is no more data for the whole scan
494      completeNoMoreResults();
495      return;
496    }
497    if (scan.getLimit() > 0) {
498      // The RS should have set the moreResults field in ScanResponse to false when we have reached
499      // the limit, so we add an assert here.
500      int newLimit = scan.getLimit() - numberOfCompleteRows;
501      assert newLimit > 0;
502      scan.setLimit(newLimit);
503    }
504    // as in 2.0 this value will always be set
505    if (!resp.getMoreResultsInRegion()) {
506      completeWhenNoMoreResultsInRegion.run();
507      return;
508    }
509    next();
510  }
511
512  private void onComplete(HBaseRpcController controller, ScanResponse resp) {
513    if (controller.failed()) {
514      onError(controller.getFailed());
515      return;
516    }
517    updateServerSideMetrics(scanMetrics, resp);
518    boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
519    Result[] rawResults;
520    Result[] results;
521    int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
522    try {
523      rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
524      updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
525      results = resultCache.addAndGet(
526        Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
527        isHeartbeatMessage);
528    } catch (IOException e) {
529      // We can not retry here. The server has responded normally and the call sequence has been
530      // increased so a new scan with the same call sequence will cause an
531      // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
532      LOG.warn("decode scan response failed", e);
533      completeWhenError(true);
534      return;
535    }
536
537    ScanControllerImpl scanController;
538    if (results.length > 0) {
539      scanController = new ScanControllerImpl(
540        resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) : Optional.empty());
541      updateNextStartRowWhenError(results[results.length - 1]);
542      consumer.onNext(results, scanController);
543    } else {
544      Optional<Cursor> cursor = Optional.empty();
545      if (resp.hasCursor()) {
546        cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
547      } else if (scan.isNeedCursorResult() && rawResults.length > 0) {
548        // It is size limit exceed and we need to return the last Result's row.
549        // When user setBatch and the scanner is reopened, the server may return Results that
550        // user has seen and the last Result can not be seen because the number is not enough.
551        // So the row keys of results may not be same, we must use the last one.
552        cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
553      }
554      scanController = new ScanControllerImpl(cursor);
555      if (isHeartbeatMessage || cursor.isPresent()) {
556        // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
557        // want to pass a cursor to upper layer.
558        consumer.onHeartbeat(scanController);
559      }
560    }
561    ScanControllerState state = scanController.destroy();
562    if (state == ScanControllerState.TERMINATED) {
563      stopScan(resp);
564      return;
565    }
566    int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
567    if (state == ScanControllerState.SUSPENDED) {
568      if (scanController.resumer.prepare(resp, numberOfCompleteRows)) {
569        return;
570      }
571    }
572    completeOrNext(resp, numberOfCompleteRows);
573  }
574
575  private void stopScan(ScanResponse resp) {
576    if (resp.getMoreResultsInRegion()) {
577      // we have more results in region but user request to stop the scan, so we need to close the
578      // scanner explicitly.
579      closeScanner();
580    }
581    completeNoMoreResults();
582  }
583
584  private void call() {
585    // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
586    // less than the scan timeout. If the server does not respond in time(usually this will not
587    // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
588    // resending the next request and the only way to fix this is to close the scanner and open a
589    // new one.
590    long callTimeoutNs;
591    if (scanTimeoutNs > 0) {
592      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNanos);
593      if (remainingNs <= 0) {
594        completeExceptionally(true);
595        return;
596      }
597      callTimeoutNs = remainingNs;
598    } else {
599      callTimeoutNs = 0L;
600    }
601    incRPCCallsMetrics(scanMetrics, regionServerRemote);
602    if (tries > 1) {
603      incRPCRetriesMetrics(scanMetrics, regionServerRemote);
604    }
605    resetController(controller, callTimeoutNs, priority, loc.getRegion().getTable());
606    ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
607      nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
608    final Context context = Context.current();
609    stub.scan(controller, req, resp -> {
610      try (Scope ignored = context.makeCurrent()) {
611        onComplete(controller, resp);
612      }
613    });
614  }
615
616  private void next() {
617    nextCallSeq++;
618    tries = 1;
619    exceptions.clear();
620    nextCallStartNanos = System.nanoTime();
621    incMillisBetweenNextsMetrics(scanMetrics,
622      TimeUnit.NANOSECONDS.toMillis(nextCallStartNanos - lastNextCallNanos));
623    lastNextCallNanos = nextCallStartNanos;
624    call();
625  }
626
627  private void renewLease() {
628    incRPCCallsMetrics(scanMetrics, regionServerRemote);
629    nextCallSeq++;
630    resetController(controller, rpcTimeoutNs, priority, loc.getRegion().getTable());
631    ScanRequest req =
632      RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
633    stub.scan(controller, req, resp -> {
634    });
635  }
636
637  /**
638   * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
639   * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
640   * open scanner request is also needed because we may have some data in the CellScanner which is
641   * contained in the controller.
642   * @return {@code true} if we should continue, otherwise {@code false}.
643   */
644  public CompletableFuture<Boolean> start(HBaseRpcController controller,
645    ScanResponse respWhenOpen) {
646    onComplete(controller, respWhenOpen);
647    return future;
648  }
649}