001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
028
029import io.opentelemetry.context.Context;
030import io.opentelemetry.context.Scope;
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.OptionalLong;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.TimeUnit;
039import org.apache.hadoop.hbase.DoNotRetryIOException;
040import org.apache.hadoop.hbase.HBaseServerException;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.UnknownScannerException;
045import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
046import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
047import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
048import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
049import org.apache.hadoop.hbase.exceptions.ScannerResetException;
050import org.apache.hadoop.hbase.ipc.HBaseRpcController;
051import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
058import org.apache.hbase.thirdparty.io.netty.util.Timeout;
059import org.apache.hbase.thirdparty.io.netty.util.Timer;
060
061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
063import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
068
069/**
070 * Retry caller for scanning a region.
071 * <p>
072 * We will modify the {@link Scan} object passed in directly. The upper layer should store the
073 * reference of this object and use it to open new single region scanners.
074 */
075@InterfaceAudience.Private
076class AsyncScanSingleRegionRpcRetryingCaller {
077
078  private static final Logger LOG =
079    LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
080
081  private final Timer retryTimer;
082
083  private final Scan scan;
084
085  private final ScanMetrics scanMetrics;
086
087  private final long scannerId;
088
089  private final ScanResultCache resultCache;
090
091  private final AdvancedScanResultConsumer consumer;
092
093  private final ClientService.Interface stub;
094
095  private final HRegionLocation loc;
096
097  private final boolean regionServerRemote;
098
099  private final int priority;
100
101  private final long scannerLeaseTimeoutPeriodNs;
102
103  private final int maxAttempts;
104
105  private final long scanTimeoutNs;
106
107  private final long rpcTimeoutNs;
108
109  private final int startLogErrorsCnt;
110
111  private final Runnable completeWhenNoMoreResultsInRegion;
112
113  private final AsyncConnectionImpl conn;
114
115  private final CompletableFuture<Boolean> future;
116
117  private final HBaseRpcController controller;
118
119  private byte[] nextStartRowWhenError;
120
121  private boolean includeNextStartRowWhenError;
122
123  private long nextCallStartNs;
124
125  private int tries;
126
127  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
128
129  private long nextCallSeq = -1L;
130
131  private final HBaseServerExceptionPauseManager pauseManager;
132
133  private enum ScanControllerState {
134    INITIALIZED,
135    SUSPENDED,
136    TERMINATED,
137    DESTROYED
138  }
139
140  // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
141  // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
142  // usage. We use two things to prevent invalid usage:
143  // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
144  // IllegalStateException if the caller thread is not this thread.
145  // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
146  // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
147  // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
148  // call destroy to get the current state and set the state to DESTROYED. And when user calls
149  // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
150  // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
151  // suspend or terminate so the state will still be INITIALIZED when back from onNext or
152  // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
153  // to be used in the future.
154  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
155  // package private methods can only be called within the implementation of
156  // AsyncScanSingleRegionRpcRetryingCaller.
157  private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
158
159    // Make sure the methods are only called in this thread.
160    private final Thread callerThread;
161
162    private final Optional<Cursor> cursor;
163
164    // INITIALIZED -> SUSPENDED -> DESTROYED
165    // INITIALIZED -> TERMINATED -> DESTROYED
166    // INITIALIZED -> DESTROYED
167    // If the state is incorrect we will throw IllegalStateException.
168    private ScanControllerState state = ScanControllerState.INITIALIZED;
169
170    private ScanResumerImpl resumer;
171
172    public ScanControllerImpl(Optional<Cursor> cursor) {
173      this.callerThread = Thread.currentThread();
174      this.cursor = cursor;
175    }
176
177    private void preCheck() {
178      Preconditions.checkState(Thread.currentThread() == callerThread,
179        "The current thread is %s, expected thread is %s, "
180          + "you should not call this method outside onNext or onHeartbeat",
181        Thread.currentThread(), callerThread);
182      Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
183        "Invalid Stopper state %s", state);
184    }
185
186    @Override
187    public ScanResumer suspend() {
188      preCheck();
189      state = ScanControllerState.SUSPENDED;
190      ScanResumerImpl resumer = new ScanResumerImpl();
191      this.resumer = resumer;
192      return resumer;
193    }
194
195    @Override
196    public void terminate() {
197      preCheck();
198      state = ScanControllerState.TERMINATED;
199    }
200
201    // return the current state, and set the state to DESTROYED.
202    ScanControllerState destroy() {
203      ScanControllerState state = this.state;
204      this.state = ScanControllerState.DESTROYED;
205      return state;
206    }
207
208    @Override
209    public Optional<Cursor> cursor() {
210      return cursor;
211    }
212  }
213
214  private enum ScanResumerState {
215    INITIALIZED,
216    SUSPENDED,
217    RESUMED
218  }
219
220  // The resume method is allowed to be called in another thread so here we also use the
221  // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
222  // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
223  // and when user calls resume method, we will change the state to RESUMED. But the resume method
224  // could be called in other thread, and in fact, user could just do this:
225  // controller.suspend().resume()
226  // This is strange but valid. This means the scan could be resumed before we call the prepare
227  // method to do the actual suspend work. So in the resume method, we will check if the state is
228  // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
229  // method, if the state is RESUMED already, we will just return an let the scan go on.
230  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
231  // package private methods can only be called within the implementation of
232  // AsyncScanSingleRegionRpcRetryingCaller.
233  private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
234
235    // INITIALIZED -> SUSPENDED -> RESUMED
236    // INITIALIZED -> RESUMED
237    private ScanResumerState state = ScanResumerState.INITIALIZED;
238
239    private ScanResponse resp;
240
241    private int numberOfCompleteRows;
242
243    // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
244    // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
245    // every time when the previous task is finished. There could also be race as the renewal is
246    // executed in the timer thread, so we also need to check the state before lease renewal. If the
247    // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
248    // renewal task.
249    private Timeout leaseRenewer;
250
251    @Override
252    public void resume() {
253      // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
254      // just return at the first if condition without loading the resp and numValidResuls field. If
255      // resume is called after suspend, then it is also safe to just reference resp and
256      // numValidResults after the synchronized block as no one will change it anymore.
257      ScanResponse localResp;
258      int localNumberOfCompleteRows;
259      synchronized (this) {
260        if (state == ScanResumerState.INITIALIZED) {
261          // user calls this method before we call prepare, so just set the state to
262          // RESUMED, the implementation will just go on.
263          state = ScanResumerState.RESUMED;
264          return;
265        }
266        if (state == ScanResumerState.RESUMED) {
267          // already resumed, give up.
268          return;
269        }
270        state = ScanResumerState.RESUMED;
271        if (leaseRenewer != null) {
272          leaseRenewer.cancel();
273        }
274        localResp = this.resp;
275        localNumberOfCompleteRows = this.numberOfCompleteRows;
276      }
277      completeOrNext(localResp, localNumberOfCompleteRows);
278    }
279
280    private void scheduleRenewLeaseTask() {
281      leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
282        TimeUnit.NANOSECONDS);
283    }
284
285    private synchronized void tryRenewLease() {
286      // the scan has already been resumed, give up
287      if (state == ScanResumerState.RESUMED) {
288        return;
289      }
290      renewLease();
291      // schedule the next renew lease task again as this is a one-time task.
292      scheduleRenewLeaseTask();
293    }
294
295    // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
296    // for more details.
297    synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) {
298      if (state == ScanResumerState.RESUMED) {
299        // user calls resume before we actually suspend the scan, just continue;
300        return false;
301      }
302      state = ScanResumerState.SUSPENDED;
303      this.resp = resp;
304      this.numberOfCompleteRows = numberOfCompleteRows;
305      // if there are no more results in region then the scanner at RS side will be closed
306      // automatically so we do not need to renew lease.
307      if (resp.getMoreResultsInRegion()) {
308        // schedule renew lease task
309        scheduleRenewLeaseTask();
310      }
311      return true;
312    }
313  }
314
315  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
316    Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
317    AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
318    boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
319    long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
320    int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
321    this.retryTimer = retryTimer;
322    this.conn = conn;
323    this.scan = scan;
324    this.scanMetrics = scanMetrics;
325    this.scannerId = scannerId;
326    this.resultCache = resultCache;
327    this.consumer = consumer;
328    this.stub = stub;
329    this.loc = loc;
330    this.regionServerRemote = isRegionServerRemote;
331    this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
332    this.maxAttempts = maxAttempts;
333    this.scanTimeoutNs = scanTimeoutNs;
334    this.rpcTimeoutNs = rpcTimeoutNs;
335    this.startLogErrorsCnt = startLogErrorsCnt;
336    if (scan.isReversed()) {
337      completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
338    } else {
339      completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
340    }
341    this.future = new CompletableFuture<>();
342    this.priority = priority;
343    this.controller = conn.rpcControllerFactory.newController();
344    this.controller.setPriority(priority);
345    this.controller.setRequestAttributes(requestAttributes);
346    this.exceptions = new ArrayList<>();
347    this.pauseManager =
348      new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
349  }
350
351  private long elapsedMs() {
352    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
353  }
354
355  private void closeScanner() {
356    incRPCCallsMetrics(scanMetrics, regionServerRemote);
357    resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS, loc.getRegion().getTable());
358    ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
359    stub.scan(controller, req, resp -> {
360      if (controller.failed()) {
361        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
362          + " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
363          + " failed, ignore, probably already closed", controller.getFailed());
364      }
365    });
366  }
367
368  private void completeExceptionally(boolean closeScanner) {
369    resultCache.clear();
370    if (closeScanner) {
371      closeScanner();
372    }
373    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
374  }
375
376  private void completeNoMoreResults() {
377    future.complete(false);
378  }
379
380  private void completeWithNextStartRow(byte[] row, boolean inclusive) {
381    scan.withStartRow(row, inclusive);
382    future.complete(true);
383  }
384
385  private void completeWhenError(boolean closeScanner) {
386    incRPCRetriesMetrics(scanMetrics, closeScanner);
387    resultCache.clear();
388    if (closeScanner) {
389      closeScanner();
390    }
391    if (nextStartRowWhenError != null) {
392      scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError);
393    }
394    future.complete(true);
395  }
396
397  private void onError(Throwable error) {
398    error = translateException(error);
399    if (tries > startLogErrorsCnt) {
400      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
401        + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
402        + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
403        + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
404        + " ms", error);
405    }
406    boolean scannerClosed =
407      error instanceof UnknownScannerException || error instanceof NotServingRegionException
408        || error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
409    RetriesExhaustedException.ThrowableWithExtraContext qt =
410      new RetriesExhaustedException.ThrowableWithExtraContext(error,
411        EnvironmentEdgeManager.currentTime(), "");
412    exceptions.add(qt);
413    if (tries >= maxAttempts) {
414      completeExceptionally(!scannerClosed);
415      return;
416    }
417
418    OptionalLong maybePauseNsToUse =
419      pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
420    if (!maybePauseNsToUse.isPresent()) {
421      completeExceptionally(!scannerClosed);
422      return;
423    }
424    long delayNs = maybePauseNsToUse.getAsLong();
425    if (scannerClosed) {
426      completeWhenError(false);
427      return;
428    }
429    if (error instanceof OutOfOrderScannerNextException) {
430      completeWhenError(true);
431      return;
432    }
433    if (error instanceof DoNotRetryIOException) {
434      completeExceptionally(true);
435      return;
436    }
437    tries++;
438
439    if (HBaseServerException.isServerOverloaded(error)) {
440      Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
441      metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
442    }
443    retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
444  }
445
446  private void updateNextStartRowWhenError(Result result) {
447    nextStartRowWhenError = result.getRow();
448    includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
449  }
450
451  private void completeWhenNoMoreResultsInRegion() {
452    if (noMoreResultsForScan(scan, loc.getRegion())) {
453      completeNoMoreResults();
454    } else {
455      completeWithNextStartRow(loc.getRegion().getEndKey(), true);
456    }
457  }
458
459  private void completeReversedWhenNoMoreResultsInRegion() {
460    if (noMoreResultsForReverseScan(scan, loc.getRegion())) {
461      completeNoMoreResults();
462    } else {
463      completeWithNextStartRow(loc.getRegion().getStartKey(), false);
464    }
465  }
466
467  private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) {
468    if (resp.hasMoreResults() && !resp.getMoreResults()) {
469      // RS tells us there is no more data for the whole scan
470      completeNoMoreResults();
471      return;
472    }
473    if (scan.getLimit() > 0) {
474      // The RS should have set the moreResults field in ScanResponse to false when we have reached
475      // the limit, so we add an assert here.
476      int newLimit = scan.getLimit() - numberOfCompleteRows;
477      assert newLimit > 0;
478      scan.setLimit(newLimit);
479    }
480    // as in 2.0 this value will always be set
481    if (!resp.getMoreResultsInRegion()) {
482      completeWhenNoMoreResultsInRegion.run();
483      return;
484    }
485    next();
486  }
487
488  private void onComplete(HBaseRpcController controller, ScanResponse resp) {
489    if (controller.failed()) {
490      onError(controller.getFailed());
491      return;
492    }
493    updateServerSideMetrics(scanMetrics, resp);
494    boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
495    Result[] rawResults;
496    Result[] results;
497    int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
498    try {
499      rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
500      updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
501      results = resultCache.addAndGet(
502        Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
503        isHeartbeatMessage);
504    } catch (IOException e) {
505      // We can not retry here. The server has responded normally and the call sequence has been
506      // increased so a new scan with the same call sequence will cause an
507      // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
508      LOG.warn("decode scan response failed", e);
509      completeWhenError(true);
510      return;
511    }
512
513    ScanControllerImpl scanController;
514    if (results.length > 0) {
515      scanController = new ScanControllerImpl(
516        resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) : Optional.empty());
517      updateNextStartRowWhenError(results[results.length - 1]);
518      consumer.onNext(results, scanController);
519    } else {
520      Optional<Cursor> cursor = Optional.empty();
521      if (resp.hasCursor()) {
522        cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
523      } else if (scan.isNeedCursorResult() && rawResults.length > 0) {
524        // It is size limit exceed and we need to return the last Result's row.
525        // When user setBatch and the scanner is reopened, the server may return Results that
526        // user has seen and the last Result can not be seen because the number is not enough.
527        // So the row keys of results may not be same, we must use the last one.
528        cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
529      }
530      scanController = new ScanControllerImpl(cursor);
531      if (isHeartbeatMessage || cursor.isPresent()) {
532        // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
533        // want to pass a cursor to upper layer.
534        consumer.onHeartbeat(scanController);
535      }
536    }
537    ScanControllerState state = scanController.destroy();
538    if (state == ScanControllerState.TERMINATED) {
539      if (resp.getMoreResultsInRegion()) {
540        // we have more results in region but user request to stop the scan, so we need to close the
541        // scanner explicitly.
542        closeScanner();
543      }
544      completeNoMoreResults();
545      return;
546    }
547    int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
548    if (state == ScanControllerState.SUSPENDED) {
549      if (scanController.resumer.prepare(resp, numberOfCompleteRows)) {
550        return;
551      }
552    }
553    completeOrNext(resp, numberOfCompleteRows);
554  }
555
556  private void call() {
557    // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
558    // less than the scan timeout. If the server does not respond in time(usually this will not
559    // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
560    // resending the next request and the only way to fix this is to close the scanner and open a
561    // new one.
562    long callTimeoutNs;
563    if (scanTimeoutNs > 0) {
564      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
565      if (remainingNs <= 0) {
566        completeExceptionally(true);
567        return;
568      }
569      callTimeoutNs = remainingNs;
570    } else {
571      callTimeoutNs = 0L;
572    }
573    incRPCCallsMetrics(scanMetrics, regionServerRemote);
574    if (tries > 1) {
575      incRPCRetriesMetrics(scanMetrics, regionServerRemote);
576    }
577    resetController(controller, callTimeoutNs, priority, loc.getRegion().getTable());
578    ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
579      nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
580    final Context context = Context.current();
581    stub.scan(controller, req, resp -> {
582      try (Scope ignored = context.makeCurrent()) {
583        onComplete(controller, resp);
584      }
585    });
586  }
587
588  private void next() {
589    nextCallSeq++;
590    tries = 1;
591    exceptions.clear();
592    nextCallStartNs = System.nanoTime();
593    call();
594  }
595
596  private void renewLease() {
597    incRPCCallsMetrics(scanMetrics, regionServerRemote);
598    nextCallSeq++;
599    resetController(controller, rpcTimeoutNs, priority, loc.getRegion().getTable());
600    ScanRequest req =
601      RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
602    stub.scan(controller, req, resp -> {
603    });
604  }
605
606  /**
607   * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
608   * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
609   * open scanner request is also needed because we may have some data in the CellScanner which is
610   * contained in the controller.
611   * @return {@code true} if we should continue, otherwise {@code false}.
612   */
613  public CompletableFuture<Boolean> start(HBaseRpcController controller,
614    ScanResponse respWhenOpen) {
615    onComplete(controller, respWhenOpen);
616    return future;
617  }
618}