001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
028
029import io.opentelemetry.context.Context;
030import io.opentelemetry.context.Scope;
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.OptionalLong;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.TimeUnit;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.hadoop.hbase.HBaseServerException;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.UnknownScannerException;
045import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
046import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
047import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
048import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
049import org.apache.hadoop.hbase.exceptions.ScannerResetException;
050import org.apache.hadoop.hbase.ipc.HBaseRpcController;
051import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
058import org.apache.hbase.thirdparty.io.netty.util.Timeout;
059import org.apache.hbase.thirdparty.io.netty.util.Timer;
060
061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
063import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
068
069/**
070 * Retry caller for scanning a region.
071 * <p>
072 * We will modify the {@link Scan} object passed in directly. The upper layer should store the
073 * reference of this object and use it to open new single region scanners.
074 */
075@InterfaceAudience.Private
076class AsyncScanSingleRegionRpcRetryingCaller {
077
078  private static final Logger LOG =
079    LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
080
081  private final Timer retryTimer;
082
083  private final Scan scan;
084
085  private final ScanMetrics scanMetrics;
086
087  private final long scannerId;
088
089  private final ScanResultCache resultCache;
090
091  private final AdvancedScanResultConsumer consumer;
092
093  private final ClientService.Interface stub;
094
095  private final HRegionLocation loc;
096
097  private final boolean regionServerRemote;
098
099  private final int priority;
100
101  private final long scannerLeaseTimeoutPeriodNs;
102
103  private final int maxAttempts;
104
105  private final long scanTimeoutNs;
106
107  private final long rpcTimeoutNs;
108
109  private final int startLogErrorsCnt;
110
111  private final Runnable completeWhenNoMoreResultsInRegion;
112
113  private final AsyncConnectionImpl conn;
114
115  private final CompletableFuture<Boolean> future;
116
117  private final HBaseRpcController controller;
118
119  private byte[] nextStartRowWhenError;
120
121  private boolean includeNextStartRowWhenError;
122
123  private long nextCallStartNs;
124
125  private int tries;
126
127  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
128
129  private long nextCallSeq = -1L;
130
131  private final HBaseServerExceptionPauseManager pauseManager;
132
133  private enum ScanControllerState {
134    INITIALIZED,
135    SUSPENDED,
136    TERMINATED,
137    DESTROYED
138  }
139
140  // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
141  // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
142  // usage. We use two things to prevent invalid usage:
143  // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
144  // IllegalStateException if the caller thread is not this thread.
145  // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
146  // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
147  // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
148  // call destroy to get the current state and set the state to DESTROYED. And when user calls
149  // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
150  // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
151  // suspend or terminate so the state will still be INITIALIZED when back from onNext or
152  // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
153  // to be used in the future.
154  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
155  // package private methods can only be called within the implementation of
156  // AsyncScanSingleRegionRpcRetryingCaller.
157  private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
158
159    // Make sure the methods are only called in this thread.
160    private final Thread callerThread;
161
162    private final Optional<Cursor> cursor;
163
164    // INITIALIZED -> SUSPENDED -> DESTROYED
165    // INITIALIZED -> TERMINATED -> DESTROYED
166    // INITIALIZED -> DESTROYED
167    // If the state is incorrect we will throw IllegalStateException.
168    private ScanControllerState state = ScanControllerState.INITIALIZED;
169
170    private ScanResumerImpl resumer;
171
172    public ScanControllerImpl(Optional<Cursor> cursor) {
173      this.callerThread = Thread.currentThread();
174      this.cursor = cursor;
175    }
176
177    private void preCheck() {
178      Preconditions.checkState(Thread.currentThread() == callerThread,
179        "The current thread is %s, expected thread is %s, "
180          + "you should not call this method outside onNext or onHeartbeat",
181        Thread.currentThread(), callerThread);
182      Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
183        "Invalid Stopper state %s", state);
184    }
185
186    @Override
187    public ScanResumer suspend() {
188      preCheck();
189      state = ScanControllerState.SUSPENDED;
190      ScanResumerImpl resumer = new ScanResumerImpl();
191      this.resumer = resumer;
192      return resumer;
193    }
194
195    @Override
196    public void terminate() {
197      preCheck();
198      state = ScanControllerState.TERMINATED;
199    }
200
201    // return the current state, and set the state to DESTROYED.
202    ScanControllerState destroy() {
203      ScanControllerState state = this.state;
204      this.state = ScanControllerState.DESTROYED;
205      return state;
206    }
207
208    @Override
209    public Optional<Cursor> cursor() {
210      return cursor;
211    }
212  }
213
214  private enum ScanResumerState {
215    INITIALIZED,
216    SUSPENDED,
217    RESUMED
218  }
219
220  // The resume method is allowed to be called in another thread so here we also use the
221  // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
222  // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
223  // and when user calls resume method, we will change the state to RESUMED. But the resume method
224  // could be called in other thread, and in fact, user could just do this:
225  // controller.suspend().resume()
226  // This is strange but valid. This means the scan could be resumed before we call the prepare
227  // method to do the actual suspend work. So in the resume method, we will check if the state is
228  // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
229  // method, if the state is RESUMED already, we will just return an let the scan go on.
230  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
231  // package private methods can only be called within the implementation of
232  // AsyncScanSingleRegionRpcRetryingCaller.
233  @InterfaceAudience.Private
234  final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
235
236    // INITIALIZED -> SUSPENDED -> RESUMED
237    // INITIALIZED -> RESUMED
238    private ScanResumerState state = ScanResumerState.INITIALIZED;
239
240    private ScanResponse resp;
241
242    private int numberOfCompleteRows;
243
244    // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
245    // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
246    // every time when the previous task is finished. There could also be race as the renewal is
247    // executed in the timer thread, so we also need to check the state before lease renewal. If the
248    // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
249    // renewal task.
250    private Timeout leaseRenewer;
251
252    @Override
253    public void resume() {
254      doResume(false);
255    }
256
257    /**
258     * This method is used when {@link ScanControllerImpl#suspend} had ever been called to get a
259     * {@link ScanResumerImpl}, but now user stops scan and does not need any more scan results.
260     */
261    public void terminate() {
262      doResume(true);
263    }
264
265    private void doResume(boolean stopScan) {
266      // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
267      // just return at the first if condition without loading the resp and numValidResuls field. If
268      // resume is called after suspend, then it is also safe to just reference resp and
269      // numValidResults after the synchronized block as no one will change it anymore.
270      ScanResponse localResp;
271      int localNumberOfCompleteRows;
272      synchronized (this) {
273        if (state == ScanResumerState.INITIALIZED) {
274          // user calls this method before we call prepare, so just set the state to
275          // RESUMED, the implementation will just go on.
276          state = ScanResumerState.RESUMED;
277          return;
278        }
279        if (state == ScanResumerState.RESUMED) {
280          // already resumed, give up.
281          return;
282        }
283        state = ScanResumerState.RESUMED;
284        if (leaseRenewer != null) {
285          leaseRenewer.cancel();
286        }
287        localResp = this.resp;
288        localNumberOfCompleteRows = this.numberOfCompleteRows;
289      }
290      if (stopScan) {
291        stopScan(localResp);
292      } else {
293        completeOrNext(localResp, localNumberOfCompleteRows);
294      }
295    }
296
297    private void scheduleRenewLeaseTask() {
298      leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
299        TimeUnit.NANOSECONDS);
300    }
301
302    private synchronized void tryRenewLease() {
303      // the scan has already been resumed, give up
304      if (state == ScanResumerState.RESUMED) {
305        return;
306      }
307      renewLease();
308      // schedule the next renew lease task again as this is a one-time task.
309      scheduleRenewLeaseTask();
310    }
311
312    // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
313    // for more details.
314    synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) {
315      if (state == ScanResumerState.RESUMED) {
316        // user calls resume before we actually suspend the scan, just continue;
317        return false;
318      }
319      state = ScanResumerState.SUSPENDED;
320      this.resp = resp;
321      this.numberOfCompleteRows = numberOfCompleteRows;
322      // if there are no more results in region then the scanner at RS side will be closed
323      // automatically so we do not need to renew lease.
324      if (resp.getMoreResultsInRegion()) {
325        // schedule renew lease task
326        scheduleRenewLeaseTask();
327      }
328      return true;
329    }
330  }
331
332  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
333    Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
334    AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
335    boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
336    long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
337    int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
338    this.retryTimer = retryTimer;
339    this.conn = conn;
340    this.scan = scan;
341    this.scanMetrics = scanMetrics;
342    this.scannerId = scannerId;
343    this.resultCache = resultCache;
344    this.consumer = consumer;
345    this.stub = stub;
346    this.loc = loc;
347    this.regionServerRemote = isRegionServerRemote;
348    this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
349    this.maxAttempts = maxAttempts;
350    this.scanTimeoutNs = scanTimeoutNs;
351    this.rpcTimeoutNs = rpcTimeoutNs;
352    this.startLogErrorsCnt = startLogErrorsCnt;
353    if (scan.isReversed()) {
354      completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
355    } else {
356      completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
357    }
358    this.future = new CompletableFuture<>();
359    this.priority = priority;
360    this.controller = conn.rpcControllerFactory.newController();
361    this.controller.setPriority(priority);
362    this.controller.setRequestAttributes(requestAttributes);
363    this.exceptions = new ArrayList<>();
364    this.pauseManager =
365      new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
366  }
367
368  private long elapsedMs() {
369    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
370  }
371
372  private void closeScanner() {
373    incRPCCallsMetrics(scanMetrics, regionServerRemote);
374    resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS, loc.getRegion().getTable());
375    ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
376    stub.scan(controller, req, resp -> {
377      if (controller.failed()) {
378        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
379          + " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
380          + " failed, ignore, probably already closed", controller.getFailed());
381      }
382    });
383  }
384
385  private void completeExceptionally(boolean closeScanner) {
386    resultCache.clear();
387    if (closeScanner) {
388      closeScanner();
389    }
390    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
391  }
392
393  private void completeNoMoreResults() {
394    future.complete(false);
395  }
396
397  private void completeWithNextStartRow(byte[] row, boolean inclusive) {
398    scan.withStartRow(row, inclusive);
399    future.complete(true);
400  }
401
402  private void completeWhenError(boolean closeScanner) {
403    incRPCRetriesMetrics(scanMetrics, closeScanner);
404    resultCache.clear();
405    if (closeScanner) {
406      closeScanner();
407    }
408    if (nextStartRowWhenError != null) {
409      scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError);
410    }
411    future.complete(true);
412  }
413
414  private void onError(Throwable error) {
415    error = translateException(error);
416    if (tries > startLogErrorsCnt) {
417      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
418        + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
419        + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
420        + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
421        + " ms", error);
422    }
423    boolean scannerClosed =
424      error instanceof UnknownScannerException || error instanceof NotServingRegionException
425        || error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
426    RetriesExhaustedException.ThrowableWithExtraContext qt =
427      new RetriesExhaustedException.ThrowableWithExtraContext(error,
428        EnvironmentEdgeManager.currentTime(), "");
429    exceptions.add(qt);
430    if (tries >= maxAttempts) {
431      completeExceptionally(!scannerClosed);
432      return;
433    }
434
435    OptionalLong maybePauseNsToUse =
436      pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
437    if (!maybePauseNsToUse.isPresent()) {
438      completeExceptionally(!scannerClosed);
439      return;
440    }
441    long delayNs = maybePauseNsToUse.getAsLong();
442    if (scannerClosed) {
443      completeWhenError(false);
444      return;
445    }
446    if (error instanceof OutOfOrderScannerNextException) {
447      completeWhenError(true);
448      return;
449    }
450    if (error instanceof DoNotRetryIOException) {
451      completeExceptionally(true);
452      return;
453    }
454    tries++;
455
456    if (HBaseServerException.isServerOverloaded(error)) {
457      Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
458      metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
459    }
460    retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
461  }
462
463  private void updateNextStartRowWhenError(Result result) {
464    nextStartRowWhenError = result.getRow();
465    includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
466  }
467
468  private void completeWhenNoMoreResultsInRegion() {
469    if (noMoreResultsForScan(scan, loc.getRegion())) {
470      completeNoMoreResults();
471    } else {
472      completeWithNextStartRow(loc.getRegion().getEndKey(), true);
473    }
474  }
475
476  private void completeReversedWhenNoMoreResultsInRegion() {
477    if (noMoreResultsForReverseScan(scan, loc.getRegion())) {
478      completeNoMoreResults();
479    } else {
480      completeWithNextStartRow(loc.getRegion().getStartKey(), false);
481    }
482  }
483
484  private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) {
485    if (resp.hasMoreResults() && !resp.getMoreResults()) {
486      // RS tells us there is no more data for the whole scan
487      completeNoMoreResults();
488      return;
489    }
490    if (scan.getLimit() > 0) {
491      // The RS should have set the moreResults field in ScanResponse to false when we have reached
492      // the limit, so we add an assert here.
493      int newLimit = scan.getLimit() - numberOfCompleteRows;
494      assert newLimit > 0;
495      scan.setLimit(newLimit);
496    }
497    // as in 2.0 this value will always be set
498    if (!resp.getMoreResultsInRegion()) {
499      completeWhenNoMoreResultsInRegion.run();
500      return;
501    }
502    next();
503  }
504
505  private void onComplete(HBaseRpcController controller, ScanResponse resp) {
506    if (controller.failed()) {
507      onError(controller.getFailed());
508      return;
509    }
510    updateServerSideMetrics(scanMetrics, resp);
511    boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
512    Result[] rawResults;
513    Result[] results;
514    int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
515    try {
516      rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
517      updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
518      results = resultCache.addAndGet(
519        Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
520        isHeartbeatMessage);
521    } catch (IOException e) {
522      // We can not retry here. The server has responded normally and the call sequence has been
523      // increased so a new scan with the same call sequence will cause an
524      // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
525      LOG.warn("decode scan response failed", e);
526      completeWhenError(true);
527      return;
528    }
529
530    ScanControllerImpl scanController;
531    if (results.length > 0) {
532      scanController = new ScanControllerImpl(
533        resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) : Optional.empty());
534      updateNextStartRowWhenError(results[results.length - 1]);
535      consumer.onNext(results, scanController);
536    } else {
537      Optional<Cursor> cursor = Optional.empty();
538      if (resp.hasCursor()) {
539        cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
540      } else if (scan.isNeedCursorResult() && rawResults.length > 0) {
541        // It is size limit exceed and we need to return the last Result's row.
542        // When user setBatch and the scanner is reopened, the server may return Results that
543        // user has seen and the last Result can not be seen because the number is not enough.
544        // So the row keys of results may not be same, we must use the last one.
545        cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
546      }
547      scanController = new ScanControllerImpl(cursor);
548      if (isHeartbeatMessage || cursor.isPresent()) {
549        // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
550        // want to pass a cursor to upper layer.
551        consumer.onHeartbeat(scanController);
552      }
553    }
554    ScanControllerState state = scanController.destroy();
555    if (state == ScanControllerState.TERMINATED) {
556      stopScan(resp);
557      return;
558    }
559    int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
560    if (state == ScanControllerState.SUSPENDED) {
561      if (scanController.resumer.prepare(resp, numberOfCompleteRows)) {
562        return;
563      }
564    }
565    completeOrNext(resp, numberOfCompleteRows);
566  }
567
568  private void stopScan(ScanResponse resp) {
569    if (resp.getMoreResultsInRegion()) {
570      // we have more results in region but user request to stop the scan, so we need to close the
571      // scanner explicitly.
572      closeScanner();
573    }
574    completeNoMoreResults();
575  }
576
577  private void call() {
578    // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
579    // less than the scan timeout. If the server does not respond in time(usually this will not
580    // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
581    // resending the next request and the only way to fix this is to close the scanner and open a
582    // new one.
583    long callTimeoutNs;
584    if (scanTimeoutNs > 0) {
585      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
586      if (remainingNs <= 0) {
587        completeExceptionally(true);
588        return;
589      }
590      callTimeoutNs = remainingNs;
591    } else {
592      callTimeoutNs = 0L;
593    }
594    incRPCCallsMetrics(scanMetrics, regionServerRemote);
595    if (tries > 1) {
596      incRPCRetriesMetrics(scanMetrics, regionServerRemote);
597    }
598    resetController(controller, callTimeoutNs, priority, loc.getRegion().getTable());
599    ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
600      nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
601    final Context context = Context.current();
602    stub.scan(controller, req, resp -> {
603      try (Scope ignored = context.makeCurrent()) {
604        onComplete(controller, resp);
605      }
606    });
607  }
608
609  private void next() {
610    nextCallSeq++;
611    tries = 1;
612    exceptions.clear();
613    nextCallStartNs = System.nanoTime();
614    call();
615  }
616
617  private void renewLease() {
618    incRPCCallsMetrics(scanMetrics, regionServerRemote);
619    nextCallSeq++;
620    resetController(controller, rpcTimeoutNs, priority, loc.getRegion().getTable());
621    ScanRequest req =
622      RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
623    stub.scan(controller, req, resp -> {
624    });
625  }
626
627  /**
628   * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
629   * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
630   * open scanner request is also needed because we may have some data in the CellScanner which is
631   * contained in the controller.
632   * @return {@code true} if we should continue, otherwise {@code false}.
633   */
634  public CompletableFuture<Boolean> start(HBaseRpcController controller,
635    ScanResponse respWhenOpen) {
636    onComplete(controller, respWhenOpen);
637    return future;
638  }
639}