001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
027import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
029import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Optional;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.TimeUnit;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.NotServingRegionException;
040import org.apache.hadoop.hbase.UnknownScannerException;
041import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
042import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
043import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
044import org.apache.hadoop.hbase.exceptions.ScannerResetException;
045import org.apache.hadoop.hbase.ipc.HBaseRpcController;
046import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
053import org.apache.hbase.thirdparty.io.netty.util.Timeout;
054import org.apache.hbase.thirdparty.io.netty.util.Timer;
055
056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
058import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
063
064/**
065 * Retry caller for scanning a region.
066 * <p>
067 * We will modify the {@link Scan} object passed in directly. The upper layer should store the
068 * reference of this object and use it to open new single region scanners.
069 */
070@InterfaceAudience.Private
071class AsyncScanSingleRegionRpcRetryingCaller {
072
073  private static final Logger LOG =
074      LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
075
076  private final Timer retryTimer;
077
078  private final Scan scan;
079
080  private final ScanMetrics scanMetrics;
081
082  private final long scannerId;
083
084  private final ScanResultCache resultCache;
085
086  private final AdvancedScanResultConsumer consumer;
087
088  private final ClientService.Interface stub;
089
090  private final HRegionLocation loc;
091
092  private final boolean regionServerRemote;
093
094  private final long scannerLeaseTimeoutPeriodNs;
095
096  private final long pauseNs;
097
098  private final int maxAttempts;
099
100  private final long scanTimeoutNs;
101
102  private final long rpcTimeoutNs;
103
104  private final int startLogErrorsCnt;
105
106  private final Runnable completeWhenNoMoreResultsInRegion;
107
108  private final CompletableFuture<Boolean> future;
109
110  private final HBaseRpcController controller;
111
112  private byte[] nextStartRowWhenError;
113
114  private boolean includeNextStartRowWhenError;
115
116  private long nextCallStartNs;
117
118  private int tries;
119
120  private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
121
122  private long nextCallSeq = -1L;
123
124  private enum ScanControllerState {
125    INITIALIZED, SUSPENDED, TERMINATED, DESTROYED
126  }
127
128  // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
129  // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
130  // usage. We use two things to prevent invalid usage:
131  // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
132  // IllegalStateException if the caller thread is not this thread.
133  // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
134  // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
135  // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
136  // call destroy to get the current state and set the state to DESTROYED. And when user calls
137  // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
138  // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
139  // suspend or terminate so the state will still be INITIALIZED when back from onNext or
140  // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
141  // to be used in the future.
142  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
143  // package private methods can only be called within the implementation of
144  // AsyncScanSingleRegionRpcRetryingCaller.
145  private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
146
147    // Make sure the methods are only called in this thread.
148    private final Thread callerThread;
149
150    private final Optional<Cursor> cursor;
151
152    // INITIALIZED -> SUSPENDED -> DESTROYED
153    // INITIALIZED -> TERMINATED -> DESTROYED
154    // INITIALIZED -> DESTROYED
155    // If the state is incorrect we will throw IllegalStateException.
156    private ScanControllerState state = ScanControllerState.INITIALIZED;
157
158    private ScanResumerImpl resumer;
159
160    public ScanControllerImpl(Optional<Cursor> cursor) {
161      this.callerThread = Thread.currentThread();
162      this.cursor = cursor;
163    }
164
165    private void preCheck() {
166      Preconditions.checkState(Thread.currentThread() == callerThread,
167        "The current thread is %s, expected thread is %s, " +
168            "you should not call this method outside onNext or onHeartbeat",
169        Thread.currentThread(), callerThread);
170      Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
171        "Invalid Stopper state %s", state);
172    }
173
174    @Override
175    public ScanResumer suspend() {
176      preCheck();
177      state = ScanControllerState.SUSPENDED;
178      ScanResumerImpl resumer = new ScanResumerImpl();
179      this.resumer = resumer;
180      return resumer;
181    }
182
183    @Override
184    public void terminate() {
185      preCheck();
186      state = ScanControllerState.TERMINATED;
187    }
188
189    // return the current state, and set the state to DESTROYED.
190    ScanControllerState destroy() {
191      ScanControllerState state = this.state;
192      this.state = ScanControllerState.DESTROYED;
193      return state;
194    }
195
196    @Override
197    public Optional<Cursor> cursor() {
198        return cursor;
199    }
200  }
201
202  private enum ScanResumerState {
203    INITIALIZED, SUSPENDED, RESUMED
204  }
205
206  // The resume method is allowed to be called in another thread so here we also use the
207  // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
208  // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
209  // and when user calls resume method, we will change the state to RESUMED. But the resume method
210  // could be called in other thread, and in fact, user could just do this:
211  // controller.suspend().resume()
212  // This is strange but valid. This means the scan could be resumed before we call the prepare
213  // method to do the actual suspend work. So in the resume method, we will check if the state is
214  // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
215  // method, if the state is RESUMED already, we will just return an let the scan go on.
216  // Notice that, the public methods of this class is supposed to be called by upper layer only, and
217  // package private methods can only be called within the implementation of
218  // AsyncScanSingleRegionRpcRetryingCaller.
219  private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
220
221    // INITIALIZED -> SUSPENDED -> RESUMED
222    // INITIALIZED -> RESUMED
223    private ScanResumerState state = ScanResumerState.INITIALIZED;
224
225    private ScanResponse resp;
226
227    private int numberOfCompleteRows;
228
229    // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
230    // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
231    // every time when the previous task is finished. There could also be race as the renewal is
232    // executed in the timer thread, so we also need to check the state before lease renewal. If the
233    // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
234    // renewal task.
235    private Timeout leaseRenewer;
236
237    @Override
238    public void resume() {
239      // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
240      // just return at the first if condition without loading the resp and numValidResuls field. If
241      // resume is called after suspend, then it is also safe to just reference resp and
242      // numValidResults after the synchronized block as no one will change it anymore.
243      ScanResponse localResp;
244      int localNumberOfCompleteRows;
245      synchronized (this) {
246        if (state == ScanResumerState.INITIALIZED) {
247          // user calls this method before we call prepare, so just set the state to
248          // RESUMED, the implementation will just go on.
249          state = ScanResumerState.RESUMED;
250          return;
251        }
252        if (state == ScanResumerState.RESUMED) {
253          // already resumed, give up.
254          return;
255        }
256        state = ScanResumerState.RESUMED;
257        if (leaseRenewer != null) {
258          leaseRenewer.cancel();
259        }
260        localResp = this.resp;
261        localNumberOfCompleteRows = this.numberOfCompleteRows;
262      }
263      completeOrNext(localResp, localNumberOfCompleteRows);
264    }
265
266    private void scheduleRenewLeaseTask() {
267      leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
268        TimeUnit.NANOSECONDS);
269    }
270
271    private synchronized void tryRenewLease() {
272      // the scan has already been resumed, give up
273      if (state == ScanResumerState.RESUMED) {
274        return;
275      }
276      renewLease();
277      // schedule the next renew lease task again as this is a one-time task.
278      scheduleRenewLeaseTask();
279    }
280
281    // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
282    // for more details.
283    synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) {
284      if (state == ScanResumerState.RESUMED) {
285        // user calls resume before we actually suspend the scan, just continue;
286        return false;
287      }
288      state = ScanResumerState.SUSPENDED;
289      this.resp = resp;
290      this.numberOfCompleteRows = numberOfCompleteRows;
291      // if there are no more results in region then the scanner at RS side will be closed
292      // automatically so we do not need to renew lease.
293      if (resp.getMoreResultsInRegion()) {
294        // schedule renew lease task
295        scheduleRenewLeaseTask();
296      }
297      return true;
298    }
299  }
300
301  public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer,
302      AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
303      ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub,
304      HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,
305      long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
306    this.retryTimer = retryTimer;
307    this.scan = scan;
308    this.scanMetrics = scanMetrics;
309    this.scannerId = scannerId;
310    this.resultCache = resultCache;
311    this.consumer = consumer;
312    this.stub = stub;
313    this.loc = loc;
314    this.regionServerRemote = isRegionServerRemote;
315    this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
316    this.pauseNs = pauseNs;
317    this.maxAttempts = maxAttempts;
318    this.scanTimeoutNs = scanTimeoutNs;
319    this.rpcTimeoutNs = rpcTimeoutNs;
320    this.startLogErrorsCnt = startLogErrorsCnt;
321    if (scan.isReversed()) {
322      completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
323    } else {
324      completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
325    }
326    this.future = new CompletableFuture<>();
327    this.controller = conn.rpcControllerFactory.newController();
328    this.exceptions = new ArrayList<>();
329  }
330
331  private long elapsedMs() {
332    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
333  }
334
335  private long remainingTimeNs() {
336    return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
337  }
338
339  private void closeScanner() {
340    incRPCCallsMetrics(scanMetrics, regionServerRemote);
341    resetController(controller, rpcTimeoutNs);
342    ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
343    stub.scan(controller, req, resp -> {
344      if (controller.failed()) {
345        LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId +
346            " for " + loc.getRegion().getEncodedName() + " of " +
347            loc.getRegion().getTable() + " failed, ignore, probably already closed",
348          controller.getFailed());
349      }
350    });
351  }
352
353  private void completeExceptionally(boolean closeScanner) {
354    resultCache.clear();
355    if (closeScanner) {
356      closeScanner();
357    }
358    future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
359  }
360
361  private void completeNoMoreResults() {
362    future.complete(false);
363  }
364
365  private void completeWithNextStartRow(byte[] row, boolean inclusive) {
366    scan.withStartRow(row, inclusive);
367    future.complete(true);
368  }
369
370  private void completeWhenError(boolean closeScanner) {
371    incRPCRetriesMetrics(scanMetrics, closeScanner);
372    resultCache.clear();
373    if (closeScanner) {
374      closeScanner();
375    }
376    if (nextStartRowWhenError != null) {
377      scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError);
378    }
379    future.complete(true);
380  }
381
382  private void onError(Throwable error) {
383    error = translateException(error);
384    if (tries > startLogErrorsCnt) {
385      LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " +
386          loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() +
387          " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " +
388          TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() +
389          " ms",
390        error);
391    }
392    boolean scannerClosed = error instanceof UnknownScannerException ||
393        error instanceof NotServingRegionException || error instanceof RegionServerStoppedException;
394    RetriesExhaustedException.ThrowableWithExtraContext qt =
395        new RetriesExhaustedException.ThrowableWithExtraContext(error,
396            EnvironmentEdgeManager.currentTime(), "");
397    exceptions.add(qt);
398    if (tries >= maxAttempts) {
399      completeExceptionally(!scannerClosed);
400      return;
401    }
402    long delayNs;
403    if (scanTimeoutNs > 0) {
404      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
405      if (maxDelayNs <= 0) {
406        completeExceptionally(!scannerClosed);
407        return;
408      }
409      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
410    } else {
411      delayNs = getPauseTime(pauseNs, tries - 1);
412    }
413    if (scannerClosed) {
414      completeWhenError(false);
415      return;
416    }
417    if (error instanceof OutOfOrderScannerNextException || error instanceof ScannerResetException) {
418      completeWhenError(true);
419      return;
420    }
421    if (error instanceof DoNotRetryIOException) {
422      completeExceptionally(true);
423      return;
424    }
425    tries++;
426    retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
427  }
428
429  private void updateNextStartRowWhenError(Result result) {
430    nextStartRowWhenError = result.getRow();
431    includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
432  }
433
434  private void completeWhenNoMoreResultsInRegion() {
435    if (noMoreResultsForScan(scan, loc.getRegion())) {
436      completeNoMoreResults();
437    } else {
438      completeWithNextStartRow(loc.getRegion().getEndKey(), true);
439    }
440  }
441
442  private void completeReversedWhenNoMoreResultsInRegion() {
443    if (noMoreResultsForReverseScan(scan, loc.getRegion())) {
444      completeNoMoreResults();
445    } else {
446      completeWithNextStartRow(loc.getRegion().getStartKey(), false);
447    }
448  }
449
450  private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) {
451    if (resp.hasMoreResults() && !resp.getMoreResults()) {
452      // RS tells us there is no more data for the whole scan
453      completeNoMoreResults();
454      return;
455    }
456    if (scan.getLimit() > 0) {
457      // The RS should have set the moreResults field in ScanResponse to false when we have reached
458      // the limit, so we add an assert here.
459      int newLimit = scan.getLimit() - numberOfCompleteRows;
460      assert newLimit > 0;
461      scan.setLimit(newLimit);
462    }
463    // as in 2.0 this value will always be set
464    if (!resp.getMoreResultsInRegion()) {
465      completeWhenNoMoreResultsInRegion.run();
466      return;
467    }
468    next();
469  }
470
471  private void onComplete(HBaseRpcController controller, ScanResponse resp) {
472    if (controller.failed()) {
473      onError(controller.getFailed());
474      return;
475    }
476    updateServerSideMetrics(scanMetrics, resp);
477    boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
478    Result[] rawResults;
479    Result[] results;
480    int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
481    try {
482      rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
483      updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
484      results = resultCache.addAndGet(
485        Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
486        isHeartbeatMessage);
487    } catch (IOException e) {
488      // We can not retry here. The server has responded normally and the call sequence has been
489      // increased so a new scan with the same call sequence will cause an
490      // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
491      LOG.warn("decode scan response failed", e);
492      completeWhenError(true);
493      return;
494    }
495
496    ScanControllerImpl scanController;
497    if (results.length > 0) {
498      scanController = new ScanControllerImpl(
499          resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor()))
500              : Optional.empty());
501      updateNextStartRowWhenError(results[results.length - 1]);
502      consumer.onNext(results, scanController);
503    } else {
504      Optional<Cursor> cursor = Optional.empty();
505      if (resp.hasCursor()) {
506        cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
507      } else if (scan.isNeedCursorResult() && rawResults.length > 0) {
508        // It is size limit exceed and we need to return the last Result's row.
509        // When user setBatch and the scanner is reopened, the server may return Results that
510        // user has seen and the last Result can not be seen because the number is not enough.
511        // So the row keys of results may not be same, we must use the last one.
512        cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
513      }
514      scanController = new ScanControllerImpl(cursor);
515      if (isHeartbeatMessage || cursor.isPresent()) {
516        // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
517        // want to pass a cursor to upper layer.
518        consumer.onHeartbeat(scanController);
519      }
520    }
521    ScanControllerState state = scanController.destroy();
522    if (state == ScanControllerState.TERMINATED) {
523      if (resp.getMoreResultsInRegion()) {
524        // we have more results in region but user request to stop the scan, so we need to close the
525        // scanner explicitly.
526        closeScanner();
527      }
528      completeNoMoreResults();
529      return;
530    }
531    int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
532    if (state == ScanControllerState.SUSPENDED) {
533      if (scanController.resumer.prepare(resp, numberOfCompleteRows)) {
534        return;
535      }
536    }
537    completeOrNext(resp, numberOfCompleteRows);
538  }
539
540  private void call() {
541    // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
542    // less than the scan timeout. If the server does not respond in time(usually this will not
543    // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
544    // resending the next request and the only way to fix this is to close the scanner and open a
545    // new one.
546    long callTimeoutNs;
547    if (scanTimeoutNs > 0) {
548      long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
549      if (remainingNs <= 0) {
550        completeExceptionally(true);
551        return;
552      }
553      callTimeoutNs = remainingNs;
554    } else {
555      callTimeoutNs = 0L;
556    }
557    incRPCCallsMetrics(scanMetrics, regionServerRemote);
558    if (tries > 1) {
559      incRPCRetriesMetrics(scanMetrics, regionServerRemote);
560    }
561    resetController(controller, callTimeoutNs);
562    ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
563      nextCallSeq, false, false, scan.getLimit());
564    stub.scan(controller, req, resp -> onComplete(controller, resp));
565  }
566
567  private void next() {
568    nextCallSeq++;
569    tries = 1;
570    exceptions.clear();
571    nextCallStartNs = System.nanoTime();
572    call();
573  }
574
575  private void renewLease() {
576    incRPCCallsMetrics(scanMetrics, regionServerRemote);
577    nextCallSeq++;
578    resetController(controller, rpcTimeoutNs);
579    ScanRequest req =
580        RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
581    stub.scan(controller, req, resp -> {
582    });
583  }
584
585  /**
586   * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
587   * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
588   * open scanner request is also needed because we may have some data in the CellScanner which is
589   * contained in the controller.
590   * @return {@code true} if we should continue, otherwise {@code false}.
591   */
592  public CompletableFuture<Boolean> start(HBaseRpcController controller,
593      ScanResponse respWhenOpen) {
594    onComplete(controller, respWhenOpen);
595    return future;
596  }
597}