001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
029import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Optional;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.TimeUnit;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.NotServingRegionException;
040import org.apache.hadoop.hbase.UnknownScannerException;
041import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
042import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
043import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
044import org.apache.hadoop.hbase.exceptions.ScannerResetException;
045import org.apache.hadoop.hbase.ipc.HBaseRpcController;
046import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
053import org.apache.hbase.thirdparty.io.netty.util.Timeout;
054import org.apache.hbase.thirdparty.io.netty.util.Timer;
055
056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
058import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
063
064/**
065 * Retry caller for scanning a region.
066 * <p>
067 * We will modify the {@link Scan} object passed in directly. The upper layer should store the
068 * reference of this object and use it to open new single region scanners.
069 */
070@InterfaceAudience.Private
071class AsyncScanSingleRegionRpcRetryingCaller {
072
073  private static final Logger LOG =
074      LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
075
076  private final Timer retryTimer;
077
078  private final Scan scan;
079
080  private final ScanMetrics scanMetrics;
081
082  private final long scannerId;
083
084  private final ScanResultCache resultCache;
085
086  private final AdvancedScanResultConsumer consumer;
087
088  private final ClientService.Interface stub;
089
090  private final HRegionLocation loc;
091
092  private final boolean regionServerRemote;
093
094  private final long scannerLeaseTimeoutPeriodNs;
095
096  private final long pauseNs;
097
098  private final int maxAttempts;
099
100  private final long scanTimeoutNs;
101
102  private final long rpcTimeoutNs;
103
104  private final int startLogErrorsCnt;
105
106  private final Runnable completeWhenNoMoreResultsInRegion;
107
108  private final CompletableFuture<Boolean> future;
109
110  private final HBaseRpcController controller;
111
112  private byte[] nextStartRowWhenError;
113
114  private boolean includeNextStartRowWhenError;
115
116  private long nextCallStartNs;
117
118  private int tries;
119
120  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
121
122  private long nextCallSeq = -1L;
123
124  private enum ScanControllerState {
125    INITIALIZED, SUSPENDED, TERMINATED, DESTROYED
126  }
127
128  // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
129  // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
130  // usage. We use two things to prevent invalid usage:
131  // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
132  // IllegalStateException if the caller thread is not this thread.
133  // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
134  // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
135  // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
136  // call destroy to get the current state and set the state to DESTROYED. And when user calls
137  // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
138  // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
139  // suspend or terminate so the state will still be INITIALIZED when back from onNext or
140  // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
141  // to be used in the future.
142  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
143  // package private methods can only be called within the implementation of
144  // AsyncScanSingleRegionRpcRetryingCaller.
145  private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
146
147    // Make sure the methods are only called in this thread.
148    private final Thread callerThread;
149
150    private final Optional<Cursor> cursor;
151
152    // INITIALIZED -> SUSPENDED -> DESTROYED
153    // INITIALIZED -> TERMINATED -> DESTROYED
154    // INITIALIZED -> DESTROYED
155    // If the state is incorrect we will throw IllegalStateException.
156    private ScanControllerState state = ScanControllerState.INITIALIZED;
157
158    private ScanResumerImpl resumer;
159
160    public ScanControllerImpl(Optional<Cursor> cursor) {
161      this.callerThread = Thread.currentThread();
162      this.cursor = cursor;
163    }
164
165    private void preCheck() {
166      Preconditions.checkState(Thread.currentThread() == callerThread,
167        "The current thread is %s, expected thread is %s, " +
168            "you should not call this method outside onNext or onHeartbeat",
169        Thread.currentThread(), callerThread);
170      Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
171        "Invalid Stopper state %s", state);
172    }
173
174    @Override
175    public ScanResumer suspend() {
176      preCheck();
177      state = ScanControllerState.SUSPENDED;
178      ScanResumerImpl resumer = new ScanResumerImpl();
179      this.resumer = resumer;
180      return resumer;
181    }
182
183    @Override
184    public void terminate() {
185      preCheck();
186      state = ScanControllerState.TERMINATED;
187    }
188
189    // return the current state, and set the state to DESTROYED.
190    ScanControllerState destroy() {
191      ScanControllerState state = this.state;
192      this.state = ScanControllerState.DESTROYED;
193      return state;
194    }
195
196    @Override
197    public Optional<Cursor> cursor() {
198        return cursor;
199    }
200  }
201
202  private enum ScanResumerState {
203    INITIALIZED, SUSPENDED, RESUMED
204  }
205
206  // The resume method is allowed to be called in another thread so here we also use the
207  // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
208  // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
209  // and when user calls resume method, we will change the state to RESUMED. But the resume method
210  // could be called in other thread, and in fact, user could just do this:
211  // controller.suspend().resume()
212  // This is strange but valid. This means the scan could be resumed before we call the prepare
213  // method to do the actual suspend work. So in the resume method, we will check if the state is
214  // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
215  // method, if the state is RESUMED already, we will just return an let the scan go on.
216  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
217  // package private methods can only be called within the implementation of
218  // AsyncScanSingleRegionRpcRetryingCaller.
219  private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
220
221    // INITIALIZED -> SUSPENDED -> RESUMED
222    // INITIALIZED -> RESUMED
223    private ScanResumerState state = ScanResumerState.INITIALIZED;
224
225    private ScanResponse resp;
226
227    private int numberOfCompleteRows;
228
229    // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
230    // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
231    // every time when the previous task is finished. There could also be race as the renewal is
232    // executed in the timer thread, so we also need to check the state before lease renewal. If the
233    // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
234    // renewal task.
235    private Timeout leaseRenewer;
236
237    @Override
238    public void resume() {
239      // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
240      // just return at the first if condition without loading the resp and numValidResuls field. If
241      // resume is called after suspend, then it is also safe to just reference resp and
242      // numValidResults after the synchronized block as no one will change it anymore.
243      ScanResponse localResp;
244      int localNumberOfCompleteRows;
245      synchronized (this) {
246        if (state == ScanResumerState.INITIALIZED) {
247          // user calls this method before we call prepare, so just set the state to
248          // RESUMED, the implementation will just go on.
249          state = ScanResumerState.RESUMED;
250          return;
251        }
252        if (state == ScanResumerState.RESUMED) {
253          // already resumed, give up.
254          return;
255        }
256        state = ScanResumerState.RESUMED;
257        if (leaseRenewer != null) {
258          leaseRenewer.cancel();
259        }
260        localResp = this.resp;
261        localNumberOfCompleteRows = this.numberOfCompleteRows;
262      }
263      completeOrNext(localResp, localNumberOfCompleteRows);
264    }
265
266    private void scheduleRenewLeaseTask() {
267      leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
268        TimeUnit.NANOSECONDS);
269    }
270
271    private synchronized void tryRenewLease() {
272      // the scan has already been resumed, give up
273      if (state == ScanResumerState.RESUMED) {
274        return;
275      }
276      renewLease();
277      // schedule the next renew lease task again as this is a one-time task.
278      scheduleRenewLeaseTask();
279    }
280
281    // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
282    // for more details.
283    synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) {
284      if (state == ScanResumerState.RESUMED) {
285        // user calls resume before we actually suspend the scan, just continue;
286        return false;
287      }
288      state = ScanResumerState.SUSPENDED;
289      this.resp = resp;
290      this.numberOfCompleteRows = numberOfCompleteRows;
291      // if there are no more results in region then the scanner at RS side will be closed
292      // automatically so we do not need to renew lease.
293      if (resp.getMoreResultsInRegion()) {
294        // schedule renew lease task
295        scheduleRenewLeaseTask();
296      }
297      return true;
298    }
299  }
300
301  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer,
302      AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
303      ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub,
304      HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,
305      long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
306    this.retryTimer = retryTimer;
307    this.scan = scan;
308    this.scanMetrics = scanMetrics;
309    this.scannerId = scannerId;
310    this.resultCache = resultCache;
311    this.consumer = consumer;
312    this.stub = stub;
313    this.loc = loc;
314    this.regionServerRemote = isRegionServerRemote;
315    this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
316    this.pauseNs = pauseNs;
317    this.maxAttempts = maxAttempts;
318    this.scanTimeoutNs = scanTimeoutNs;
319    this.rpcTimeoutNs = rpcTimeoutNs;
320    this.startLogErrorsCnt = startLogErrorsCnt;
321    if (scan.isReversed()) {
322      completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
323    } else {
324      completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
325    }
326    this.future = new CompletableFuture<>();
327    this.controller = conn.rpcControllerFactory.newController();
328    this.exceptions = new ArrayList<>();
329  }
330
331  private long elapsedMs() {
332    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
333  }
334
335  private long remainingTimeNs() {
336    return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
337  }
338
339  private void closeScanner() {
340    incRPCCallsMetrics(scanMetrics, regionServerRemote);
341    resetController(controller, rpcTimeoutNs);
342    ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
343    stub.scan(controller, req, resp -> {
344      if (controller.failed()) {
345        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId +
346            " for " + loc.getRegion().getEncodedName() + " of " +
347            loc.getRegion().getTable() + " failed, ignore, probably already closed",
348          controller.getFailed());
349      }
350    });
351  }
352
353  private void completeExceptionally(boolean closeScanner) {
354    resultCache.clear();
355    if (closeScanner) {
356      closeScanner();
357    }
358    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
359  }
360
361  private void completeNoMoreResults() {
362    future.complete(false);
363  }
364
365  private void completeWithNextStartRow(byte[] row, boolean inclusive) {
366    scan.withStartRow(row, inclusive);
367    future.complete(true);
368  }
369
370  private void completeWhenError(boolean closeScanner) {
371    incRPCRetriesMetrics(scanMetrics, closeScanner);
372    resultCache.clear();
373    if (closeScanner) {
374      closeScanner();
375    }
376    if (nextStartRowWhenError != null) {
377      scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError);
378    }
379    future.complete(true);
380  }
381
382  private void onError(Throwable error) {
383    error = translateException(error);
384    if (tries > startLogErrorsCnt) {
385      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " +
386          loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() +
387          " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " +
388          TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() +
389          " ms",
390        error);
391    }
392    boolean scannerClosed =
393      error instanceof UnknownScannerException || error instanceof NotServingRegionException ||
394        error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
395    RetriesExhaustedException.ThrowableWithExtraContext qt =
396      new RetriesExhaustedException.ThrowableWithExtraContext(error,
397        EnvironmentEdgeManager.currentTime(), "");
398    exceptions.add(qt);
399    if (tries >= maxAttempts) {
400      completeExceptionally(!scannerClosed);
401      return;
402    }
403    long delayNs;
404    if (scanTimeoutNs > 0) {
405      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
406      if (maxDelayNs <= 0) {
407        completeExceptionally(!scannerClosed);
408        return;
409      }
410      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
411    } else {
412      delayNs = getPauseTime(pauseNs, tries - 1);
413    }
414    if (scannerClosed) {
415      completeWhenError(false);
416      return;
417    }
418    if (error instanceof OutOfOrderScannerNextException) {
419      completeWhenError(true);
420      return;
421    }
422    if (error instanceof DoNotRetryIOException) {
423      completeExceptionally(true);
424      return;
425    }
426    tries++;
427    retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
428  }
429
430  private void updateNextStartRowWhenError(Result result) {
431    nextStartRowWhenError = result.getRow();
432    includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
433  }
434
435  private void completeWhenNoMoreResultsInRegion() {
436    if (noMoreResultsForScan(scan, loc.getRegion())) {
437      completeNoMoreResults();
438    } else {
439      completeWithNextStartRow(loc.getRegion().getEndKey(), true);
440    }
441  }
442
443  private void completeReversedWhenNoMoreResultsInRegion() {
444    if (noMoreResultsForReverseScan(scan, loc.getRegion())) {
445      completeNoMoreResults();
446    } else {
447      completeWithNextStartRow(loc.getRegion().getStartKey(), false);
448    }
449  }
450
451  private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) {
452    if (resp.hasMoreResults() && !resp.getMoreResults()) {
453      // RS tells us there is no more data for the whole scan
454      completeNoMoreResults();
455      return;
456    }
457    if (scan.getLimit() > 0) {
458      // The RS should have set the moreResults field in ScanResponse to false when we have reached
459      // the limit, so we add an assert here.
460      int newLimit = scan.getLimit() - numberOfCompleteRows;
461      assert newLimit > 0;
462      scan.setLimit(newLimit);
463    }
464    // as in 2.0 this value will always be set
465    if (!resp.getMoreResultsInRegion()) {
466      completeWhenNoMoreResultsInRegion.run();
467      return;
468    }
469    next();
470  }
471
472  private void onComplete(HBaseRpcController controller, ScanResponse resp) {
473    if (controller.failed()) {
474      onError(controller.getFailed());
475      return;
476    }
477    updateServerSideMetrics(scanMetrics, resp);
478    boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
479    Result[] rawResults;
480    Result[] results;
481    int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
482    try {
483      rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
484      updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
485      results = resultCache.addAndGet(
486        Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
487        isHeartbeatMessage);
488    } catch (IOException e) {
489      // We can not retry here. The server has responded normally and the call sequence has been
490      // increased so a new scan with the same call sequence will cause an
491      // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
492      LOG.warn("decode scan response failed", e);
493      completeWhenError(true);
494      return;
495    }
496
497    ScanControllerImpl scanController;
498    if (results.length > 0) {
499      scanController = new ScanControllerImpl(
500          resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor()))
501              : Optional.empty());
502      updateNextStartRowWhenError(results[results.length - 1]);
503      consumer.onNext(results, scanController);
504    } else {
505      Optional<Cursor> cursor = Optional.empty();
506      if (resp.hasCursor()) {
507        cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
508      } else if (scan.isNeedCursorResult() && rawResults.length > 0) {
509        // It is size limit exceed and we need to return the last Result's row.
510        // When user setBatch and the scanner is reopened, the server may return Results that
511        // user has seen and the last Result can not be seen because the number is not enough.
512        // So the row keys of results may not be same, we must use the last one.
513        cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
514      }
515      scanController = new ScanControllerImpl(cursor);
516      if (isHeartbeatMessage || cursor.isPresent()) {
517        // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
518        // want to pass a cursor to upper layer.
519        consumer.onHeartbeat(scanController);
520      }
521    }
522    ScanControllerState state = scanController.destroy();
523    if (state == ScanControllerState.TERMINATED) {
524      if (resp.getMoreResultsInRegion()) {
525        // we have more results in region but user request to stop the scan, so we need to close the
526        // scanner explicitly.
527        closeScanner();
528      }
529      completeNoMoreResults();
530      return;
531    }
532    int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
533    if (state == ScanControllerState.SUSPENDED) {
534      if (scanController.resumer.prepare(resp, numberOfCompleteRows)) {
535        return;
536      }
537    }
538    completeOrNext(resp, numberOfCompleteRows);
539  }
540
541  private void call() {
542    // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
543    // less than the scan timeout. If the server does not respond in time(usually this will not
544    // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
545    // resending the next request and the only way to fix this is to close the scanner and open a
546    // new one.
547    long callTimeoutNs;
548    if (scanTimeoutNs > 0) {
549      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
550      if (remainingNs <= 0) {
551        completeExceptionally(true);
552        return;
553      }
554      callTimeoutNs = remainingNs;
555    } else {
556      callTimeoutNs = 0L;
557    }
558    incRPCCallsMetrics(scanMetrics, regionServerRemote);
559    if (tries > 1) {
560      incRPCRetriesMetrics(scanMetrics, regionServerRemote);
561    }
562    resetController(controller, callTimeoutNs);
563    ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
564      nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
565    stub.scan(controller, req, resp -> onComplete(controller, resp));
566  }
567
568  private void next() {
569    nextCallSeq++;
570    tries = 1;
571    exceptions.clear();
572    nextCallStartNs = System.nanoTime();
573    call();
574  }
575
576  private void renewLease() {
577    incRPCCallsMetrics(scanMetrics, regionServerRemote);
578    nextCallSeq++;
579    resetController(controller, rpcTimeoutNs);
580    ScanRequest req =
581        RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
582    stub.scan(controller, req, resp -> {
583    });
584  }
585
586  /**
587   * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
588   * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
589   * open scanner request is also needed because we may have some data in the CellScanner which is
590   * contained in the controller.
591   * @return {@code true} if we should continue, otherwise {@code false}.
592   */
593  public CompletableFuture<Boolean> start(HBaseRpcController controller,
594      ScanResponse respWhenOpen) {
595    onComplete(controller, respWhenOpen);
596    return future;
597  }
598}