001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
029import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
030
031import io.opentelemetry.context.Context;
032import io.opentelemetry.context.Scope;
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.List;
036import java.util.Optional;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.TimeUnit;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.hadoop.hbase.HBaseServerException;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.UnknownScannerException;
045import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
046import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
047import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
048import org.apache.hadoop.hbase.exceptions.ScannerResetException;
049import org.apache.hadoop.hbase.ipc.HBaseRpcController;
050import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
057import org.apache.hbase.thirdparty.io.netty.util.Timeout;
058import org.apache.hbase.thirdparty.io.netty.util.Timer;
059
060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
061import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
062import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
067
068/**
069 * Retry caller for scanning a region.
070 * <p>
071 * We will modify the {@link Scan} object passed in directly. The upper layer should store the
072 * reference of this object and use it to open new single region scanners.
073 */
074@InterfaceAudience.Private
075class AsyncScanSingleRegionRpcRetryingCaller {
076
077  private static final Logger LOG =
078    LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
079
080  private final Timer retryTimer;
081
082  private final Scan scan;
083
084  private final ScanMetrics scanMetrics;
085
086  private final long scannerId;
087
088  private final ScanResultCache resultCache;
089
090  private final AdvancedScanResultConsumer consumer;
091
092  private final ClientService.Interface stub;
093
094  private final HRegionLocation loc;
095
096  private final boolean regionServerRemote;
097
098  private final int priority;
099
100  private final long scannerLeaseTimeoutPeriodNs;
101
102  private final long pauseNs;
103
104  private final long pauseNsForServerOverloaded;
105
106  private final int maxAttempts;
107
108  private final long scanTimeoutNs;
109
110  private final long rpcTimeoutNs;
111
112  private final int startLogErrorsCnt;
113
114  private final Runnable completeWhenNoMoreResultsInRegion;
115
116  private final AsyncConnectionImpl conn;
117
118  private final CompletableFuture<Boolean> future;
119
120  private final HBaseRpcController controller;
121
122  private byte[] nextStartRowWhenError;
123
124  private boolean includeNextStartRowWhenError;
125
126  private long nextCallStartNs;
127
128  private int tries;
129
130  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
131
132  private long nextCallSeq = -1L;
133
134  private enum ScanControllerState {
135    INITIALIZED,
136    SUSPENDED,
137    TERMINATED,
138    DESTROYED
139  }
140
141  // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
142  // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
143  // usage. We use two things to prevent invalid usage:
144  // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
145  // IllegalStateException if the caller thread is not this thread.
146  // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
147  // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
148  // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
149  // call destroy to get the current state and set the state to DESTROYED. And when user calls
150  // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
151  // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
152  // suspend or terminate so the state will still be INITIALIZED when back from onNext or
153  // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
154  // to be used in the future.
155  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
156  // package private methods can only be called within the implementation of
157  // AsyncScanSingleRegionRpcRetryingCaller.
158  private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
159
160    // Make sure the methods are only called in this thread.
161    private final Thread callerThread;
162
163    private final Optional<Cursor> cursor;
164
165    // INITIALIZED -> SUSPENDED -> DESTROYED
166    // INITIALIZED -> TERMINATED -> DESTROYED
167    // INITIALIZED -> DESTROYED
168    // If the state is incorrect we will throw IllegalStateException.
169    private ScanControllerState state = ScanControllerState.INITIALIZED;
170
171    private ScanResumerImpl resumer;
172
173    public ScanControllerImpl(Optional<Cursor> cursor) {
174      this.callerThread = Thread.currentThread();
175      this.cursor = cursor;
176    }
177
178    private void preCheck() {
179      Preconditions.checkState(Thread.currentThread() == callerThread,
180        "The current thread is %s, expected thread is %s, "
181          + "you should not call this method outside onNext or onHeartbeat",
182        Thread.currentThread(), callerThread);
183      Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
184        "Invalid Stopper state %s", state);
185    }
186
187    @Override
188    public ScanResumer suspend() {
189      preCheck();
190      state = ScanControllerState.SUSPENDED;
191      ScanResumerImpl resumer = new ScanResumerImpl();
192      this.resumer = resumer;
193      return resumer;
194    }
195
196    @Override
197    public void terminate() {
198      preCheck();
199      state = ScanControllerState.TERMINATED;
200    }
201
202    // return the current state, and set the state to DESTROYED.
203    ScanControllerState destroy() {
204      ScanControllerState state = this.state;
205      this.state = ScanControllerState.DESTROYED;
206      return state;
207    }
208
209    @Override
210    public Optional<Cursor> cursor() {
211      return cursor;
212    }
213  }
214
215  private enum ScanResumerState {
216    INITIALIZED,
217    SUSPENDED,
218    RESUMED
219  }
220
221  // The resume method is allowed to be called in another thread so here we also use the
222  // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
223  // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
224  // and when user calls resume method, we will change the state to RESUMED. But the resume method
225  // could be called in other thread, and in fact, user could just do this:
226  // controller.suspend().resume()
227  // This is strange but valid. This means the scan could be resumed before we call the prepare
228  // method to do the actual suspend work. So in the resume method, we will check if the state is
229  // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
230  // method, if the state is RESUMED already, we will just return an let the scan go on.
231  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
232  // package private methods can only be called within the implementation of
233  // AsyncScanSingleRegionRpcRetryingCaller.
234  private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
235
236    // INITIALIZED -> SUSPENDED -> RESUMED
237    // INITIALIZED -> RESUMED
238    private ScanResumerState state = ScanResumerState.INITIALIZED;
239
240    private ScanResponse resp;
241
242    private int numberOfCompleteRows;
243
244    // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
245    // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
246    // every time when the previous task is finished. There could also be race as the renewal is
247    // executed in the timer thread, so we also need to check the state before lease renewal. If the
248    // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
249    // renewal task.
250    private Timeout leaseRenewer;
251
252    @Override
253    public void resume() {
254      // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
255      // just return at the first if condition without loading the resp and numValidResuls field. If
256      // resume is called after suspend, then it is also safe to just reference resp and
257      // numValidResults after the synchronized block as no one will change it anymore.
258      ScanResponse localResp;
259      int localNumberOfCompleteRows;
260      synchronized (this) {
261        if (state == ScanResumerState.INITIALIZED) {
262          // user calls this method before we call prepare, so just set the state to
263          // RESUMED, the implementation will just go on.
264          state = ScanResumerState.RESUMED;
265          return;
266        }
267        if (state == ScanResumerState.RESUMED) {
268          // already resumed, give up.
269          return;
270        }
271        state = ScanResumerState.RESUMED;
272        if (leaseRenewer != null) {
273          leaseRenewer.cancel();
274        }
275        localResp = this.resp;
276        localNumberOfCompleteRows = this.numberOfCompleteRows;
277      }
278      completeOrNext(localResp, localNumberOfCompleteRows);
279    }
280
281    private void scheduleRenewLeaseTask() {
282      leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
283        TimeUnit.NANOSECONDS);
284    }
285
286    private synchronized void tryRenewLease() {
287      // the scan has already been resumed, give up
288      if (state == ScanResumerState.RESUMED) {
289        return;
290      }
291      renewLease();
292      // schedule the next renew lease task again as this is a one-time task.
293      scheduleRenewLeaseTask();
294    }
295
296    // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
297    // for more details.
298    synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) {
299      if (state == ScanResumerState.RESUMED) {
300        // user calls resume before we actually suspend the scan, just continue;
301        return false;
302      }
303      state = ScanResumerState.SUSPENDED;
304      this.resp = resp;
305      this.numberOfCompleteRows = numberOfCompleteRows;
306      // if there are no more results in region then the scanner at RS side will be closed
307      // automatically so we do not need to renew lease.
308      if (resp.getMoreResultsInRegion()) {
309        // schedule renew lease task
310        scheduleRenewLeaseTask();
311      }
312      return true;
313    }
314  }
315
316  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
317    Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
318    AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
319    boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
320    long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
321    int startLogErrorsCnt) {
322    this.retryTimer = retryTimer;
323    this.conn = conn;
324    this.scan = scan;
325    this.scanMetrics = scanMetrics;
326    this.scannerId = scannerId;
327    this.resultCache = resultCache;
328    this.consumer = consumer;
329    this.stub = stub;
330    this.loc = loc;
331    this.regionServerRemote = isRegionServerRemote;
332    this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
333    this.pauseNs = pauseNs;
334    this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
335    this.maxAttempts = maxAttempts;
336    this.scanTimeoutNs = scanTimeoutNs;
337    this.rpcTimeoutNs = rpcTimeoutNs;
338    this.startLogErrorsCnt = startLogErrorsCnt;
339    if (scan.isReversed()) {
340      completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
341    } else {
342      completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
343    }
344    this.future = new CompletableFuture<>();
345    this.priority = priority;
346    this.controller = conn.rpcControllerFactory.newController();
347    this.controller.setPriority(priority);
348    this.exceptions = new ArrayList<>();
349  }
350
351  private long elapsedMs() {
352    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
353  }
354
355  private long remainingTimeNs() {
356    return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
357  }
358
359  private void closeScanner() {
360    incRPCCallsMetrics(scanMetrics, regionServerRemote);
361    resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
362    ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
363    stub.scan(controller, req, resp -> {
364      if (controller.failed()) {
365        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
366          + " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
367          + " failed, ignore, probably already closed", controller.getFailed());
368      }
369    });
370  }
371
372  private void completeExceptionally(boolean closeScanner) {
373    resultCache.clear();
374    if (closeScanner) {
375      closeScanner();
376    }
377    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
378  }
379
380  private void completeNoMoreResults() {
381    future.complete(false);
382  }
383
384  private void completeWithNextStartRow(byte[] row, boolean inclusive) {
385    scan.withStartRow(row, inclusive);
386    future.complete(true);
387  }
388
389  private void completeWhenError(boolean closeScanner) {
390    incRPCRetriesMetrics(scanMetrics, closeScanner);
391    resultCache.clear();
392    if (closeScanner) {
393      closeScanner();
394    }
395    if (nextStartRowWhenError != null) {
396      scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError);
397    }
398    future.complete(true);
399  }
400
401  private void onError(Throwable error) {
402    error = translateException(error);
403    if (tries > startLogErrorsCnt) {
404      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
405        + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
406        + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
407        + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
408        + " ms", error);
409    }
410    boolean scannerClosed =
411      error instanceof UnknownScannerException || error instanceof NotServingRegionException
412        || error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
413    RetriesExhaustedException.ThrowableWithExtraContext qt =
414      new RetriesExhaustedException.ThrowableWithExtraContext(error,
415        EnvironmentEdgeManager.currentTime(), "");
416    exceptions.add(qt);
417    if (tries >= maxAttempts) {
418      completeExceptionally(!scannerClosed);
419      return;
420    }
421    long delayNs;
422    long pauseNsToUse =
423      HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
424    if (scanTimeoutNs > 0) {
425      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
426      if (maxDelayNs <= 0) {
427        completeExceptionally(!scannerClosed);
428        return;
429      }
430      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
431    } else {
432      delayNs = getPauseTime(pauseNsToUse, tries - 1);
433    }
434    if (scannerClosed) {
435      completeWhenError(false);
436      return;
437    }
438    if (error instanceof OutOfOrderScannerNextException) {
439      completeWhenError(true);
440      return;
441    }
442    if (error instanceof DoNotRetryIOException) {
443      completeExceptionally(true);
444      return;
445    }
446    tries++;
447
448    if (HBaseServerException.isServerOverloaded(error)) {
449      Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
450      metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
451    }
452    retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
453  }
454
455  private void updateNextStartRowWhenError(Result result) {
456    nextStartRowWhenError = result.getRow();
457    includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
458  }
459
460  private void completeWhenNoMoreResultsInRegion() {
461    if (noMoreResultsForScan(scan, loc.getRegion())) {
462      completeNoMoreResults();
463    } else {
464      completeWithNextStartRow(loc.getRegion().getEndKey(), true);
465    }
466  }
467
468  private void completeReversedWhenNoMoreResultsInRegion() {
469    if (noMoreResultsForReverseScan(scan, loc.getRegion())) {
470      completeNoMoreResults();
471    } else {
472      completeWithNextStartRow(loc.getRegion().getStartKey(), false);
473    }
474  }
475
476  private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) {
477    if (resp.hasMoreResults() && !resp.getMoreResults()) {
478      // RS tells us there is no more data for the whole scan
479      completeNoMoreResults();
480      return;
481    }
482    if (scan.getLimit() > 0) {
483      // The RS should have set the moreResults field in ScanResponse to false when we have reached
484      // the limit, so we add an assert here.
485      int newLimit = scan.getLimit() - numberOfCompleteRows;
486      assert newLimit > 0;
487      scan.setLimit(newLimit);
488    }
489    // as in 2.0 this value will always be set
490    if (!resp.getMoreResultsInRegion()) {
491      completeWhenNoMoreResultsInRegion.run();
492      return;
493    }
494    next();
495  }
496
497  private void onComplete(HBaseRpcController controller, ScanResponse resp) {
498    if (controller.failed()) {
499      onError(controller.getFailed());
500      return;
501    }
502    updateServerSideMetrics(scanMetrics, resp);
503    boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
504    Result[] rawResults;
505    Result[] results;
506    int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
507    try {
508      rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
509      updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
510      results = resultCache.addAndGet(
511        Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
512        isHeartbeatMessage);
513    } catch (IOException e) {
514      // We can not retry here. The server has responded normally and the call sequence has been
515      // increased so a new scan with the same call sequence will cause an
516      // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
517      LOG.warn("decode scan response failed", e);
518      completeWhenError(true);
519      return;
520    }
521
522    ScanControllerImpl scanController;
523    if (results.length > 0) {
524      scanController = new ScanControllerImpl(
525        resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) : Optional.empty());
526      updateNextStartRowWhenError(results[results.length - 1]);
527      consumer.onNext(results, scanController);
528    } else {
529      Optional<Cursor> cursor = Optional.empty();
530      if (resp.hasCursor()) {
531        cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
532      } else if (scan.isNeedCursorResult() && rawResults.length > 0) {
533        // It is size limit exceed and we need to return the last Result's row.
534        // When user setBatch and the scanner is reopened, the server may return Results that
535        // user has seen and the last Result can not be seen because the number is not enough.
536        // So the row keys of results may not be same, we must use the last one.
537        cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
538      }
539      scanController = new ScanControllerImpl(cursor);
540      if (isHeartbeatMessage || cursor.isPresent()) {
541        // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
542        // want to pass a cursor to upper layer.
543        consumer.onHeartbeat(scanController);
544      }
545    }
546    ScanControllerState state = scanController.destroy();
547    if (state == ScanControllerState.TERMINATED) {
548      if (resp.getMoreResultsInRegion()) {
549        // we have more results in region but user request to stop the scan, so we need to close the
550        // scanner explicitly.
551        closeScanner();
552      }
553      completeNoMoreResults();
554      return;
555    }
556    int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
557    if (state == ScanControllerState.SUSPENDED) {
558      if (scanController.resumer.prepare(resp, numberOfCompleteRows)) {
559        return;
560      }
561    }
562    completeOrNext(resp, numberOfCompleteRows);
563  }
564
565  private void call() {
566    // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
567    // less than the scan timeout. If the server does not respond in time(usually this will not
568    // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
569    // resending the next request and the only way to fix this is to close the scanner and open a
570    // new one.
571    long callTimeoutNs;
572    if (scanTimeoutNs > 0) {
573      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
574      if (remainingNs <= 0) {
575        completeExceptionally(true);
576        return;
577      }
578      callTimeoutNs = remainingNs;
579    } else {
580      callTimeoutNs = 0L;
581    }
582    incRPCCallsMetrics(scanMetrics, regionServerRemote);
583    if (tries > 1) {
584      incRPCRetriesMetrics(scanMetrics, regionServerRemote);
585    }
586    resetController(controller, callTimeoutNs, priority);
587    ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
588      nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
589    final Context context = Context.current();
590    stub.scan(controller, req, resp -> {
591      try (Scope ignored = context.makeCurrent()) {
592        onComplete(controller, resp);
593      }
594    });
595  }
596
597  private void next() {
598    nextCallSeq++;
599    tries = 1;
600    exceptions.clear();
601    nextCallStartNs = System.nanoTime();
602    call();
603  }
604
605  private void renewLease() {
606    incRPCCallsMetrics(scanMetrics, regionServerRemote);
607    nextCallSeq++;
608    resetController(controller, rpcTimeoutNs, priority);
609    ScanRequest req =
610      RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
611    stub.scan(controller, req, resp -> {
612    });
613  }
614
615  /**
616   * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
617   * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
618   * open scanner request is also needed because we may have some data in the CellScanner which is
619   * contained in the controller.
620   * @return {@code true} if we should continue, otherwise {@code false}.
621   */
622  public CompletableFuture<Boolean> start(HBaseRpcController controller,
623    ScanResponse respWhenOpen) {
624    onComplete(controller, respWhenOpen);
625    return future;
626  }
627}