001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
029import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Optional;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.TimeUnit;
037import org.apache.hadoop.hbase.CallQueueTooBigException;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.NotServingRegionException;
041import org.apache.hadoop.hbase.UnknownScannerException;
042import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
043import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
044import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
045import org.apache.hadoop.hbase.exceptions.ScannerResetException;
046import org.apache.hadoop.hbase.ipc.HBaseRpcController;
047import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
054import org.apache.hbase.thirdparty.io.netty.util.Timeout;
055import org.apache.hbase.thirdparty.io.netty.util.Timer;
056
057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
058import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
059import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
064
065/**
066 * Retry caller for scanning a region.
067 * <p>
068 * We will modify the {@link Scan} object passed in directly. The upper layer should store the
069 * reference of this object and use it to open new single region scanners.
070 */
071@InterfaceAudience.Private
072class AsyncScanSingleRegionRpcRetryingCaller {
073
074  private static final Logger LOG =
075      LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
076
077  private final Timer retryTimer;
078
079  private final Scan scan;
080
081  private final ScanMetrics scanMetrics;
082
083  private final long scannerId;
084
085  private final ScanResultCache resultCache;
086
087  private final AdvancedScanResultConsumer consumer;
088
089  private final ClientService.Interface stub;
090
091  private final HRegionLocation loc;
092
093  private final boolean regionServerRemote;
094
095  private final int priority;
096
097  private final long scannerLeaseTimeoutPeriodNs;
098
099  private final long pauseNs;
100
101  private final long pauseForCQTBENs;
102
103  private final int maxAttempts;
104
105  private final long scanTimeoutNs;
106
107  private final long rpcTimeoutNs;
108
109  private final int startLogErrorsCnt;
110
111  private final Runnable completeWhenNoMoreResultsInRegion;
112
113  private final CompletableFuture<Boolean> future;
114
115  private final HBaseRpcController controller;
116
117  private byte[] nextStartRowWhenError;
118
119  private boolean includeNextStartRowWhenError;
120
121  private long nextCallStartNs;
122
123  private int tries;
124
125  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
126
127  private long nextCallSeq = -1L;
128
129  private enum ScanControllerState {
130    INITIALIZED, SUSPENDED, TERMINATED, DESTROYED
131  }
132
133  // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
134  // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
135  // usage. We use two things to prevent invalid usage:
136  // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
137  // IllegalStateException if the caller thread is not this thread.
138  // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
139  // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
140  // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
141  // call destroy to get the current state and set the state to DESTROYED. And when user calls
142  // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
143  // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
144  // suspend or terminate so the state will still be INITIALIZED when back from onNext or
145  // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
146  // to be used in the future.
147  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
148  // package private methods can only be called within the implementation of
149  // AsyncScanSingleRegionRpcRetryingCaller.
150  private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
151
152    // Make sure the methods are only called in this thread.
153    private final Thread callerThread;
154
155    private final Optional<Cursor> cursor;
156
157    // INITIALIZED -> SUSPENDED -> DESTROYED
158    // INITIALIZED -> TERMINATED -> DESTROYED
159    // INITIALIZED -> DESTROYED
160    // If the state is incorrect we will throw IllegalStateException.
161    private ScanControllerState state = ScanControllerState.INITIALIZED;
162
163    private ScanResumerImpl resumer;
164
165    public ScanControllerImpl(Optional<Cursor> cursor) {
166      this.callerThread = Thread.currentThread();
167      this.cursor = cursor;
168    }
169
170    private void preCheck() {
171      Preconditions.checkState(Thread.currentThread() == callerThread,
172        "The current thread is %s, expected thread is %s, " +
173            "you should not call this method outside onNext or onHeartbeat",
174        Thread.currentThread(), callerThread);
175      Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
176        "Invalid Stopper state %s", state);
177    }
178
179    @Override
180    public ScanResumer suspend() {
181      preCheck();
182      state = ScanControllerState.SUSPENDED;
183      ScanResumerImpl resumer = new ScanResumerImpl();
184      this.resumer = resumer;
185      return resumer;
186    }
187
188    @Override
189    public void terminate() {
190      preCheck();
191      state = ScanControllerState.TERMINATED;
192    }
193
194    // return the current state, and set the state to DESTROYED.
195    ScanControllerState destroy() {
196      ScanControllerState state = this.state;
197      this.state = ScanControllerState.DESTROYED;
198      return state;
199    }
200
201    @Override
202    public Optional<Cursor> cursor() {
203        return cursor;
204    }
205  }
206
207  private enum ScanResumerState {
208    INITIALIZED, SUSPENDED, RESUMED
209  }
210
211  // The resume method is allowed to be called in another thread so here we also use the
212  // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
213  // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
214  // and when user calls resume method, we will change the state to RESUMED. But the resume method
215  // could be called in other thread, and in fact, user could just do this:
216  // controller.suspend().resume()
217  // This is strange but valid. This means the scan could be resumed before we call the prepare
218  // method to do the actual suspend work. So in the resume method, we will check if the state is
219  // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
220  // method, if the state is RESUMED already, we will just return an let the scan go on.
221  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
222  // package private methods can only be called within the implementation of
223  // AsyncScanSingleRegionRpcRetryingCaller.
224  private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
225
226    // INITIALIZED -> SUSPENDED -> RESUMED
227    // INITIALIZED -> RESUMED
228    private ScanResumerState state = ScanResumerState.INITIALIZED;
229
230    private ScanResponse resp;
231
232    private int numberOfCompleteRows;
233
234    // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
235    // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
236    // every time when the previous task is finished. There could also be race as the renewal is
237    // executed in the timer thread, so we also need to check the state before lease renewal. If the
238    // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
239    // renewal task.
240    private Timeout leaseRenewer;
241
242    @Override
243    public void resume() {
244      // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
245      // just return at the first if condition without loading the resp and numValidResuls field. If
246      // resume is called after suspend, then it is also safe to just reference resp and
247      // numValidResults after the synchronized block as no one will change it anymore.
248      ScanResponse localResp;
249      int localNumberOfCompleteRows;
250      synchronized (this) {
251        if (state == ScanResumerState.INITIALIZED) {
252          // user calls this method before we call prepare, so just set the state to
253          // RESUMED, the implementation will just go on.
254          state = ScanResumerState.RESUMED;
255          return;
256        }
257        if (state == ScanResumerState.RESUMED) {
258          // already resumed, give up.
259          return;
260        }
261        state = ScanResumerState.RESUMED;
262        if (leaseRenewer != null) {
263          leaseRenewer.cancel();
264        }
265        localResp = this.resp;
266        localNumberOfCompleteRows = this.numberOfCompleteRows;
267      }
268      completeOrNext(localResp, localNumberOfCompleteRows);
269    }
270
271    private void scheduleRenewLeaseTask() {
272      leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
273        TimeUnit.NANOSECONDS);
274    }
275
276    private synchronized void tryRenewLease() {
277      // the scan has already been resumed, give up
278      if (state == ScanResumerState.RESUMED) {
279        return;
280      }
281      renewLease();
282      // schedule the next renew lease task again as this is a one-time task.
283      scheduleRenewLeaseTask();
284    }
285
286    // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
287    // for more details.
288    synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) {
289      if (state == ScanResumerState.RESUMED) {
290        // user calls resume before we actually suspend the scan, just continue;
291        return false;
292      }
293      state = ScanResumerState.SUSPENDED;
294      this.resp = resp;
295      this.numberOfCompleteRows = numberOfCompleteRows;
296      // if there are no more results in region then the scanner at RS side will be closed
297      // automatically so we do not need to renew lease.
298      if (resp.getMoreResultsInRegion()) {
299        // schedule renew lease task
300        scheduleRenewLeaseTask();
301      }
302      return true;
303    }
304  }
305
306  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
307      Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
308      AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
309      boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
310      long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
311      int startLogErrorsCnt) {
312    this.retryTimer = retryTimer;
313    this.scan = scan;
314    this.scanMetrics = scanMetrics;
315    this.scannerId = scannerId;
316    this.resultCache = resultCache;
317    this.consumer = consumer;
318    this.stub = stub;
319    this.loc = loc;
320    this.regionServerRemote = isRegionServerRemote;
321    this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
322    this.pauseNs = pauseNs;
323    this.pauseForCQTBENs = pauseForCQTBENs;
324    this.maxAttempts = maxAttempts;
325    this.scanTimeoutNs = scanTimeoutNs;
326    this.rpcTimeoutNs = rpcTimeoutNs;
327    this.startLogErrorsCnt = startLogErrorsCnt;
328    if (scan.isReversed()) {
329      completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
330    } else {
331      completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
332    }
333    this.future = new CompletableFuture<>();
334    this.priority = priority;
335    this.controller = conn.rpcControllerFactory.newController();
336    this.controller.setPriority(priority);
337    this.exceptions = new ArrayList<>();
338  }
339
340  private long elapsedMs() {
341    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
342  }
343
344  private long remainingTimeNs() {
345    return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
346  }
347
348  private void closeScanner() {
349    incRPCCallsMetrics(scanMetrics, regionServerRemote);
350    resetController(controller, rpcTimeoutNs, priority);
351    ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
352    stub.scan(controller, req, resp -> {
353      if (controller.failed()) {
354        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId +
355            " for " + loc.getRegion().getEncodedName() + " of " +
356            loc.getRegion().getTable() + " failed, ignore, probably already closed",
357          controller.getFailed());
358      }
359    });
360  }
361
362  private void completeExceptionally(boolean closeScanner) {
363    resultCache.clear();
364    if (closeScanner) {
365      closeScanner();
366    }
367    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
368  }
369
370  private void completeNoMoreResults() {
371    future.complete(false);
372  }
373
374  private void completeWithNextStartRow(byte[] row, boolean inclusive) {
375    scan.withStartRow(row, inclusive);
376    future.complete(true);
377  }
378
379  private void completeWhenError(boolean closeScanner) {
380    incRPCRetriesMetrics(scanMetrics, closeScanner);
381    resultCache.clear();
382    if (closeScanner) {
383      closeScanner();
384    }
385    if (nextStartRowWhenError != null) {
386      scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError);
387    }
388    future.complete(true);
389  }
390
391  private void onError(Throwable error) {
392    error = translateException(error);
393    if (tries > startLogErrorsCnt) {
394      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " +
395          loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() +
396          " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " +
397          TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() +
398          " ms",
399        error);
400    }
401    boolean scannerClosed =
402      error instanceof UnknownScannerException || error instanceof NotServingRegionException ||
403        error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
404    RetriesExhaustedException.ThrowableWithExtraContext qt =
405      new RetriesExhaustedException.ThrowableWithExtraContext(error,
406        EnvironmentEdgeManager.currentTime(), "");
407    exceptions.add(qt);
408    if (tries >= maxAttempts) {
409      completeExceptionally(!scannerClosed);
410      return;
411    }
412    long delayNs;
413    long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
414    if (scanTimeoutNs > 0) {
415      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
416      if (maxDelayNs <= 0) {
417        completeExceptionally(!scannerClosed);
418        return;
419      }
420      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
421    } else {
422      delayNs = getPauseTime(pauseNsToUse, tries - 1);
423    }
424    if (scannerClosed) {
425      completeWhenError(false);
426      return;
427    }
428    if (error instanceof OutOfOrderScannerNextException) {
429      completeWhenError(true);
430      return;
431    }
432    if (error instanceof DoNotRetryIOException) {
433      completeExceptionally(true);
434      return;
435    }
436    tries++;
437    retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
438  }
439
440  private void updateNextStartRowWhenError(Result result) {
441    nextStartRowWhenError = result.getRow();
442    includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
443  }
444
445  private void completeWhenNoMoreResultsInRegion() {
446    if (noMoreResultsForScan(scan, loc.getRegion())) {
447      completeNoMoreResults();
448    } else {
449      completeWithNextStartRow(loc.getRegion().getEndKey(), true);
450    }
451  }
452
453  private void completeReversedWhenNoMoreResultsInRegion() {
454    if (noMoreResultsForReverseScan(scan, loc.getRegion())) {
455      completeNoMoreResults();
456    } else {
457      completeWithNextStartRow(loc.getRegion().getStartKey(), false);
458    }
459  }
460
461  private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) {
462    if (resp.hasMoreResults() && !resp.getMoreResults()) {
463      // RS tells us there is no more data for the whole scan
464      completeNoMoreResults();
465      return;
466    }
467    if (scan.getLimit() > 0) {
468      // The RS should have set the moreResults field in ScanResponse to false when we have reached
469      // the limit, so we add an assert here.
470      int newLimit = scan.getLimit() - numberOfCompleteRows;
471      assert newLimit > 0;
472      scan.setLimit(newLimit);
473    }
474    // as in 2.0 this value will always be set
475    if (!resp.getMoreResultsInRegion()) {
476      completeWhenNoMoreResultsInRegion.run();
477      return;
478    }
479    next();
480  }
481
482  private void onComplete(HBaseRpcController controller, ScanResponse resp) {
483    if (controller.failed()) {
484      onError(controller.getFailed());
485      return;
486    }
487    updateServerSideMetrics(scanMetrics, resp);
488    boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
489    Result[] rawResults;
490    Result[] results;
491    int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
492    try {
493      rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
494      updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
495      results = resultCache.addAndGet(
496        Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
497        isHeartbeatMessage);
498    } catch (IOException e) {
499      // We can not retry here. The server has responded normally and the call sequence has been
500      // increased so a new scan with the same call sequence will cause an
501      // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
502      LOG.warn("decode scan response failed", e);
503      completeWhenError(true);
504      return;
505    }
506
507    ScanControllerImpl scanController;
508    if (results.length > 0) {
509      scanController = new ScanControllerImpl(
510          resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor()))
511              : Optional.empty());
512      updateNextStartRowWhenError(results[results.length - 1]);
513      consumer.onNext(results, scanController);
514    } else {
515      Optional<Cursor> cursor = Optional.empty();
516      if (resp.hasCursor()) {
517        cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
518      } else if (scan.isNeedCursorResult() && rawResults.length > 0) {
519        // It is size limit exceed and we need to return the last Result's row.
520        // When user setBatch and the scanner is reopened, the server may return Results that
521        // user has seen and the last Result can not be seen because the number is not enough.
522        // So the row keys of results may not be same, we must use the last one.
523        cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
524      }
525      scanController = new ScanControllerImpl(cursor);
526      if (isHeartbeatMessage || cursor.isPresent()) {
527        // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
528        // want to pass a cursor to upper layer.
529        consumer.onHeartbeat(scanController);
530      }
531    }
532    ScanControllerState state = scanController.destroy();
533    if (state == ScanControllerState.TERMINATED) {
534      if (resp.getMoreResultsInRegion()) {
535        // we have more results in region but user request to stop the scan, so we need to close the
536        // scanner explicitly.
537        closeScanner();
538      }
539      completeNoMoreResults();
540      return;
541    }
542    int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
543    if (state == ScanControllerState.SUSPENDED) {
544      if (scanController.resumer.prepare(resp, numberOfCompleteRows)) {
545        return;
546      }
547    }
548    completeOrNext(resp, numberOfCompleteRows);
549  }
550
551  private void call() {
552    // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
553    // less than the scan timeout. If the server does not respond in time(usually this will not
554    // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
555    // resending the next request and the only way to fix this is to close the scanner and open a
556    // new one.
557    long callTimeoutNs;
558    if (scanTimeoutNs > 0) {
559      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
560      if (remainingNs <= 0) {
561        completeExceptionally(true);
562        return;
563      }
564      callTimeoutNs = remainingNs;
565    } else {
566      callTimeoutNs = 0L;
567    }
568    incRPCCallsMetrics(scanMetrics, regionServerRemote);
569    if (tries > 1) {
570      incRPCRetriesMetrics(scanMetrics, regionServerRemote);
571    }
572    resetController(controller, callTimeoutNs, priority);
573    ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
574      nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
575    stub.scan(controller, req, resp -> onComplete(controller, resp));
576  }
577
578  private void next() {
579    nextCallSeq++;
580    tries = 1;
581    exceptions.clear();
582    nextCallStartNs = System.nanoTime();
583    call();
584  }
585
586  private void renewLease() {
587    incRPCCallsMetrics(scanMetrics, regionServerRemote);
588    nextCallSeq++;
589    resetController(controller, rpcTimeoutNs, priority);
590    ScanRequest req =
591        RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
592    stub.scan(controller, req, resp -> {
593    });
594  }
595
596  /**
597   * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
598   * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
599   * open scanner request is also needed because we may have some data in the CellScanner which is
600   * contained in the controller.
601   * @return {@code true} if we should continue, otherwise {@code false}.
602   */
603  public CompletableFuture<Boolean> start(HBaseRpcController controller,
604      ScanResponse respWhenOpen) {
605    onComplete(controller, respWhenOpen);
606    return future;
607  }
608}