001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
029import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
030
031import io.opentelemetry.context.Context;
032import io.opentelemetry.context.Scope;
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.List;
036import java.util.Optional;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.TimeUnit;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.hadoop.hbase.HBaseServerException;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.UnknownScannerException;
045import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
046import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
047import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
048import org.apache.hadoop.hbase.exceptions.ScannerResetException;
049import org.apache.hadoop.hbase.ipc.HBaseRpcController;
050import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
057import org.apache.hbase.thirdparty.io.netty.util.Timeout;
058import org.apache.hbase.thirdparty.io.netty.util.Timer;
059
060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
061import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
062import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
067
068/**
069 * Retry caller for scanning a region.
070 * <p>
071 * We will modify the {@link Scan} object passed in directly. The upper layer should store the
072 * reference of this object and use it to open new single region scanners.
073 */
074@InterfaceAudience.Private
075class AsyncScanSingleRegionRpcRetryingCaller {
076
077  private static final Logger LOG =
078    LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
079
080  private final Timer retryTimer;
081
082  private final Scan scan;
083
084  private final ScanMetrics scanMetrics;
085
086  private final long scannerId;
087
088  private final ScanResultCache resultCache;
089
090  private final AdvancedScanResultConsumer consumer;
091
092  private final ClientService.Interface stub;
093
094  private final HRegionLocation loc;
095
096  private final boolean regionServerRemote;
097
098  private final int priority;
099
100  private final long scannerLeaseTimeoutPeriodNs;
101
102  private final long pauseNs;
103
104  private final long pauseNsForServerOverloaded;
105
106  private final int maxAttempts;
107
108  private final long scanTimeoutNs;
109
110  private final long rpcTimeoutNs;
111
112  private final int startLogErrorsCnt;
113
114  private final Runnable completeWhenNoMoreResultsInRegion;
115
116  private final CompletableFuture<Boolean> future;
117
118  private final HBaseRpcController controller;
119
120  private byte[] nextStartRowWhenError;
121
122  private boolean includeNextStartRowWhenError;
123
124  private long nextCallStartNs;
125
126  private int tries;
127
128  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
129
130  private long nextCallSeq = -1L;
131
132  private enum ScanControllerState {
133    INITIALIZED,
134    SUSPENDED,
135    TERMINATED,
136    DESTROYED
137  }
138
139  // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
140  // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
141  // usage. We use two things to prevent invalid usage:
142  // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
143  // IllegalStateException if the caller thread is not this thread.
144  // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
145  // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
146  // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
147  // call destroy to get the current state and set the state to DESTROYED. And when user calls
148  // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
149  // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
150  // suspend or terminate so the state will still be INITIALIZED when back from onNext or
151  // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
152  // to be used in the future.
153  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
154  // package private methods can only be called within the implementation of
155  // AsyncScanSingleRegionRpcRetryingCaller.
156  private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
157
158    // Make sure the methods are only called in this thread.
159    private final Thread callerThread;
160
161    private final Optional<Cursor> cursor;
162
163    // INITIALIZED -> SUSPENDED -> DESTROYED
164    // INITIALIZED -> TERMINATED -> DESTROYED
165    // INITIALIZED -> DESTROYED
166    // If the state is incorrect we will throw IllegalStateException.
167    private ScanControllerState state = ScanControllerState.INITIALIZED;
168
169    private ScanResumerImpl resumer;
170
171    public ScanControllerImpl(Optional<Cursor> cursor) {
172      this.callerThread = Thread.currentThread();
173      this.cursor = cursor;
174    }
175
176    private void preCheck() {
177      Preconditions.checkState(Thread.currentThread() == callerThread,
178        "The current thread is %s, expected thread is %s, "
179          + "you should not call this method outside onNext or onHeartbeat",
180        Thread.currentThread(), callerThread);
181      Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
182        "Invalid Stopper state %s", state);
183    }
184
185    @Override
186    public ScanResumer suspend() {
187      preCheck();
188      state = ScanControllerState.SUSPENDED;
189      ScanResumerImpl resumer = new ScanResumerImpl();
190      this.resumer = resumer;
191      return resumer;
192    }
193
194    @Override
195    public void terminate() {
196      preCheck();
197      state = ScanControllerState.TERMINATED;
198    }
199
200    // return the current state, and set the state to DESTROYED.
201    ScanControllerState destroy() {
202      ScanControllerState state = this.state;
203      this.state = ScanControllerState.DESTROYED;
204      return state;
205    }
206
207    @Override
208    public Optional<Cursor> cursor() {
209      return cursor;
210    }
211  }
212
213  private enum ScanResumerState {
214    INITIALIZED,
215    SUSPENDED,
216    RESUMED
217  }
218
219  // The resume method is allowed to be called in another thread so here we also use the
220  // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
221  // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
222  // and when user calls resume method, we will change the state to RESUMED. But the resume method
223  // could be called in other thread, and in fact, user could just do this:
224  // controller.suspend().resume()
225  // This is strange but valid. This means the scan could be resumed before we call the prepare
226  // method to do the actual suspend work. So in the resume method, we will check if the state is
227  // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
228  // method, if the state is RESUMED already, we will just return an let the scan go on.
229  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
230  // package private methods can only be called within the implementation of
231  // AsyncScanSingleRegionRpcRetryingCaller.
232  private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
233
234    // INITIALIZED -> SUSPENDED -> RESUMED
235    // INITIALIZED -> RESUMED
236    private ScanResumerState state = ScanResumerState.INITIALIZED;
237
238    private ScanResponse resp;
239
240    private int numberOfCompleteRows;
241
242    // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
243    // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
244    // every time when the previous task is finished. There could also be race as the renewal is
245    // executed in the timer thread, so we also need to check the state before lease renewal. If the
246    // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
247    // renewal task.
248    private Timeout leaseRenewer;
249
250    @Override
251    public void resume() {
252      // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
253      // just return at the first if condition without loading the resp and numValidResuls field. If
254      // resume is called after suspend, then it is also safe to just reference resp and
255      // numValidResults after the synchronized block as no one will change it anymore.
256      ScanResponse localResp;
257      int localNumberOfCompleteRows;
258      synchronized (this) {
259        if (state == ScanResumerState.INITIALIZED) {
260          // user calls this method before we call prepare, so just set the state to
261          // RESUMED, the implementation will just go on.
262          state = ScanResumerState.RESUMED;
263          return;
264        }
265        if (state == ScanResumerState.RESUMED) {
266          // already resumed, give up.
267          return;
268        }
269        state = ScanResumerState.RESUMED;
270        if (leaseRenewer != null) {
271          leaseRenewer.cancel();
272        }
273        localResp = this.resp;
274        localNumberOfCompleteRows = this.numberOfCompleteRows;
275      }
276      completeOrNext(localResp, localNumberOfCompleteRows);
277    }
278
279    private void scheduleRenewLeaseTask() {
280      leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
281        TimeUnit.NANOSECONDS);
282    }
283
284    private synchronized void tryRenewLease() {
285      // the scan has already been resumed, give up
286      if (state == ScanResumerState.RESUMED) {
287        return;
288      }
289      renewLease();
290      // schedule the next renew lease task again as this is a one-time task.
291      scheduleRenewLeaseTask();
292    }
293
294    // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
295    // for more details.
296    synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) {
297      if (state == ScanResumerState.RESUMED) {
298        // user calls resume before we actually suspend the scan, just continue;
299        return false;
300      }
301      state = ScanResumerState.SUSPENDED;
302      this.resp = resp;
303      this.numberOfCompleteRows = numberOfCompleteRows;
304      // if there are no more results in region then the scanner at RS side will be closed
305      // automatically so we do not need to renew lease.
306      if (resp.getMoreResultsInRegion()) {
307        // schedule renew lease task
308        scheduleRenewLeaseTask();
309      }
310      return true;
311    }
312  }
313
314  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
315    Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
316    AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
317    boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
318    long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
319    int startLogErrorsCnt) {
320    this.retryTimer = retryTimer;
321    this.scan = scan;
322    this.scanMetrics = scanMetrics;
323    this.scannerId = scannerId;
324    this.resultCache = resultCache;
325    this.consumer = consumer;
326    this.stub = stub;
327    this.loc = loc;
328    this.regionServerRemote = isRegionServerRemote;
329    this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
330    this.pauseNs = pauseNs;
331    this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
332    this.maxAttempts = maxAttempts;
333    this.scanTimeoutNs = scanTimeoutNs;
334    this.rpcTimeoutNs = rpcTimeoutNs;
335    this.startLogErrorsCnt = startLogErrorsCnt;
336    if (scan.isReversed()) {
337      completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
338    } else {
339      completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
340    }
341    this.future = new CompletableFuture<>();
342    this.priority = priority;
343    this.controller = conn.rpcControllerFactory.newController();
344    this.controller.setPriority(priority);
345    this.exceptions = new ArrayList<>();
346  }
347
348  private long elapsedMs() {
349    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
350  }
351
352  private long remainingTimeNs() {
353    return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
354  }
355
356  private void closeScanner() {
357    incRPCCallsMetrics(scanMetrics, regionServerRemote);
358    resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
359    ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
360    stub.scan(controller, req, resp -> {
361      if (controller.failed()) {
362        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
363          + " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
364          + " failed, ignore, probably already closed", controller.getFailed());
365      }
366    });
367  }
368
369  private void completeExceptionally(boolean closeScanner) {
370    resultCache.clear();
371    if (closeScanner) {
372      closeScanner();
373    }
374    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
375  }
376
377  private void completeNoMoreResults() {
378    future.complete(false);
379  }
380
381  private void completeWithNextStartRow(byte[] row, boolean inclusive) {
382    scan.withStartRow(row, inclusive);
383    future.complete(true);
384  }
385
386  private void completeWhenError(boolean closeScanner) {
387    incRPCRetriesMetrics(scanMetrics, closeScanner);
388    resultCache.clear();
389    if (closeScanner) {
390      closeScanner();
391    }
392    if (nextStartRowWhenError != null) {
393      scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError);
394    }
395    future.complete(true);
396  }
397
398  private void onError(Throwable error) {
399    error = translateException(error);
400    if (tries > startLogErrorsCnt) {
401      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
402        + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
403        + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
404        + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
405        + " ms", error);
406    }
407    boolean scannerClosed =
408      error instanceof UnknownScannerException || error instanceof NotServingRegionException
409        || error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
410    RetriesExhaustedException.ThrowableWithExtraContext qt =
411      new RetriesExhaustedException.ThrowableWithExtraContext(error,
412        EnvironmentEdgeManager.currentTime(), "");
413    exceptions.add(qt);
414    if (tries >= maxAttempts) {
415      completeExceptionally(!scannerClosed);
416      return;
417    }
418    long delayNs;
419    long pauseNsToUse =
420      HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
421    if (scanTimeoutNs > 0) {
422      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
423      if (maxDelayNs <= 0) {
424        completeExceptionally(!scannerClosed);
425        return;
426      }
427      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
428    } else {
429      delayNs = getPauseTime(pauseNsToUse, tries - 1);
430    }
431    if (scannerClosed) {
432      completeWhenError(false);
433      return;
434    }
435    if (error instanceof OutOfOrderScannerNextException) {
436      completeWhenError(true);
437      return;
438    }
439    if (error instanceof DoNotRetryIOException) {
440      completeExceptionally(true);
441      return;
442    }
443    tries++;
444    retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
445  }
446
447  private void updateNextStartRowWhenError(Result result) {
448    nextStartRowWhenError = result.getRow();
449    includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
450  }
451
452  private void completeWhenNoMoreResultsInRegion() {
453    if (noMoreResultsForScan(scan, loc.getRegion())) {
454      completeNoMoreResults();
455    } else {
456      completeWithNextStartRow(loc.getRegion().getEndKey(), true);
457    }
458  }
459
460  private void completeReversedWhenNoMoreResultsInRegion() {
461    if (noMoreResultsForReverseScan(scan, loc.getRegion())) {
462      completeNoMoreResults();
463    } else {
464      completeWithNextStartRow(loc.getRegion().getStartKey(), false);
465    }
466  }
467
468  private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) {
469    if (resp.hasMoreResults() && !resp.getMoreResults()) {
470      // RS tells us there is no more data for the whole scan
471      completeNoMoreResults();
472      return;
473    }
474    if (scan.getLimit() > 0) {
475      // The RS should have set the moreResults field in ScanResponse to false when we have reached
476      // the limit, so we add an assert here.
477      int newLimit = scan.getLimit() - numberOfCompleteRows;
478      assert newLimit > 0;
479      scan.setLimit(newLimit);
480    }
481    // as in 2.0 this value will always be set
482    if (!resp.getMoreResultsInRegion()) {
483      completeWhenNoMoreResultsInRegion.run();
484      return;
485    }
486    next();
487  }
488
489  private void onComplete(HBaseRpcController controller, ScanResponse resp) {
490    if (controller.failed()) {
491      onError(controller.getFailed());
492      return;
493    }
494    updateServerSideMetrics(scanMetrics, resp);
495    boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
496    Result[] rawResults;
497    Result[] results;
498    int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
499    try {
500      rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
501      updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
502      results = resultCache.addAndGet(
503        Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
504        isHeartbeatMessage);
505    } catch (IOException e) {
506      // We can not retry here. The server has responded normally and the call sequence has been
507      // increased so a new scan with the same call sequence will cause an
508      // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
509      LOG.warn("decode scan response failed", e);
510      completeWhenError(true);
511      return;
512    }
513
514    ScanControllerImpl scanController;
515    if (results.length > 0) {
516      scanController = new ScanControllerImpl(
517        resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) : Optional.empty());
518      updateNextStartRowWhenError(results[results.length - 1]);
519      consumer.onNext(results, scanController);
520    } else {
521      Optional<Cursor> cursor = Optional.empty();
522      if (resp.hasCursor()) {
523        cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
524      } else if (scan.isNeedCursorResult() && rawResults.length > 0) {
525        // It is size limit exceed and we need to return the last Result's row.
526        // When user setBatch and the scanner is reopened, the server may return Results that
527        // user has seen and the last Result can not be seen because the number is not enough.
528        // So the row keys of results may not be same, we must use the last one.
529        cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
530      }
531      scanController = new ScanControllerImpl(cursor);
532      if (isHeartbeatMessage || cursor.isPresent()) {
533        // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
534        // want to pass a cursor to upper layer.
535        consumer.onHeartbeat(scanController);
536      }
537    }
538    ScanControllerState state = scanController.destroy();
539    if (state == ScanControllerState.TERMINATED) {
540      if (resp.getMoreResultsInRegion()) {
541        // we have more results in region but user request to stop the scan, so we need to close the
542        // scanner explicitly.
543        closeScanner();
544      }
545      completeNoMoreResults();
546      return;
547    }
548    int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
549    if (state == ScanControllerState.SUSPENDED) {
550      if (scanController.resumer.prepare(resp, numberOfCompleteRows)) {
551        return;
552      }
553    }
554    completeOrNext(resp, numberOfCompleteRows);
555  }
556
557  private void call() {
558    // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
559    // less than the scan timeout. If the server does not respond in time(usually this will not
560    // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
561    // resending the next request and the only way to fix this is to close the scanner and open a
562    // new one.
563    long callTimeoutNs;
564    if (scanTimeoutNs > 0) {
565      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
566      if (remainingNs <= 0) {
567        completeExceptionally(true);
568        return;
569      }
570      callTimeoutNs = remainingNs;
571    } else {
572      callTimeoutNs = 0L;
573    }
574    incRPCCallsMetrics(scanMetrics, regionServerRemote);
575    if (tries > 1) {
576      incRPCRetriesMetrics(scanMetrics, regionServerRemote);
577    }
578    resetController(controller, callTimeoutNs, priority);
579    ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
580      nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
581    final Context context = Context.current();
582    stub.scan(controller, req, resp -> {
583      try (Scope ignored = context.makeCurrent()) {
584        onComplete(controller, resp);
585      }
586    });
587  }
588
589  private void next() {
590    nextCallSeq++;
591    tries = 1;
592    exceptions.clear();
593    nextCallStartNs = System.nanoTime();
594    call();
595  }
596
597  private void renewLease() {
598    incRPCCallsMetrics(scanMetrics, regionServerRemote);
599    nextCallSeq++;
600    resetController(controller, rpcTimeoutNs, priority);
601    ScanRequest req =
602      RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
603    stub.scan(controller, req, resp -> {
604    });
605  }
606
607  /**
608   * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
609   * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
610   * open scanner request is also needed because we may have some data in the CellScanner which is
611   * contained in the controller.
612   * @return {@code true} if we should continue, otherwise {@code false}.
613   */
614  public CompletableFuture<Boolean> start(HBaseRpcController controller,
615    ScanResponse respWhenOpen) {
616    onComplete(controller, respWhenOpen);
617    return future;
618  }
619}