001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; 029import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Optional; 035import java.util.concurrent.CompletableFuture; 036import java.util.concurrent.TimeUnit; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.HRegionLocation; 039import org.apache.hadoop.hbase.NotServingRegionException; 040import org.apache.hadoop.hbase.UnknownScannerException; 041import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer; 042import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 043import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 044import org.apache.hadoop.hbase.exceptions.ScannerResetException; 045import org.apache.hadoop.hbase.ipc.HBaseRpcController; 046import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 053import org.apache.hbase.thirdparty.io.netty.util.Timeout; 054import org.apache.hbase.thirdparty.io.netty.util.Timer; 055 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 058import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 063 064/** 065 * Retry caller for scanning a region. 066 * <p> 067 * We will modify the {@link Scan} object passed in directly. The upper layer should store the 068 * reference of this object and use it to open new single region scanners. 069 */ 070@InterfaceAudience.Private 071class AsyncScanSingleRegionRpcRetryingCaller { 072 073 private static final Logger LOG = 074 LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class); 075 076 private final Timer retryTimer; 077 078 private final Scan scan; 079 080 private final ScanMetrics scanMetrics; 081 082 private final long scannerId; 083 084 private final ScanResultCache resultCache; 085 086 private final AdvancedScanResultConsumer consumer; 087 088 private final ClientService.Interface stub; 089 090 private final HRegionLocation loc; 091 092 private final boolean regionServerRemote; 093 094 private final long scannerLeaseTimeoutPeriodNs; 095 096 private final long pauseNs; 097 098 private final int maxAttempts; 099 100 private final long scanTimeoutNs; 101 102 private final long rpcTimeoutNs; 103 104 private final int startLogErrorsCnt; 105 106 private final Runnable completeWhenNoMoreResultsInRegion; 107 108 private final CompletableFuture<Boolean> future; 109 110 private final HBaseRpcController controller; 111 112 private byte[] nextStartRowWhenError; 113 114 private boolean includeNextStartRowWhenError; 115 116 private long nextCallStartNs; 117 118 private int tries; 119 120 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions; 121 122 private long nextCallSeq = -1L; 123 124 private enum ScanControllerState { 125 INITIALIZED, SUSPENDED, TERMINATED, DESTROYED 126 } 127 128 // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments 129 // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid 130 // usage. We use two things to prevent invalid usage: 131 // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an 132 // IllegalStateException if the caller thread is not this thread. 133 // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will 134 // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to 135 // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will 136 // call destroy to get the current state and set the state to DESTROYED. And when user calls 137 // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw 138 // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call 139 // suspend or terminate so the state will still be INITIALIZED when back from onNext or 140 // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller 141 // to be used in the future. 142 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 143 // package private methods can only be called within the implementation of 144 // AsyncScanSingleRegionRpcRetryingCaller. 145 private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController { 146 147 // Make sure the methods are only called in this thread. 148 private final Thread callerThread; 149 150 private final Optional<Cursor> cursor; 151 152 // INITIALIZED -> SUSPENDED -> DESTROYED 153 // INITIALIZED -> TERMINATED -> DESTROYED 154 // INITIALIZED -> DESTROYED 155 // If the state is incorrect we will throw IllegalStateException. 156 private ScanControllerState state = ScanControllerState.INITIALIZED; 157 158 private ScanResumerImpl resumer; 159 160 public ScanControllerImpl(Optional<Cursor> cursor) { 161 this.callerThread = Thread.currentThread(); 162 this.cursor = cursor; 163 } 164 165 private void preCheck() { 166 Preconditions.checkState(Thread.currentThread() == callerThread, 167 "The current thread is %s, expected thread is %s, " + 168 "you should not call this method outside onNext or onHeartbeat", 169 Thread.currentThread(), callerThread); 170 Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED), 171 "Invalid Stopper state %s", state); 172 } 173 174 @Override 175 public ScanResumer suspend() { 176 preCheck(); 177 state = ScanControllerState.SUSPENDED; 178 ScanResumerImpl resumer = new ScanResumerImpl(); 179 this.resumer = resumer; 180 return resumer; 181 } 182 183 @Override 184 public void terminate() { 185 preCheck(); 186 state = ScanControllerState.TERMINATED; 187 } 188 189 // return the current state, and set the state to DESTROYED. 190 ScanControllerState destroy() { 191 ScanControllerState state = this.state; 192 this.state = ScanControllerState.DESTROYED; 193 return state; 194 } 195 196 @Override 197 public Optional<Cursor> cursor() { 198 return cursor; 199 } 200 } 201 202 private enum ScanResumerState { 203 INITIALIZED, SUSPENDED, RESUMED 204 } 205 206 // The resume method is allowed to be called in another thread so here we also use the 207 // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back 208 // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED, 209 // and when user calls resume method, we will change the state to RESUMED. But the resume method 210 // could be called in other thread, and in fact, user could just do this: 211 // controller.suspend().resume() 212 // This is strange but valid. This means the scan could be resumed before we call the prepare 213 // method to do the actual suspend work. So in the resume method, we will check if the state is 214 // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare 215 // method, if the state is RESUMED already, we will just return an let the scan go on. 216 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 217 // package private methods can only be called within the implementation of 218 // AsyncScanSingleRegionRpcRetryingCaller. 219 private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { 220 221 // INITIALIZED -> SUSPENDED -> RESUMED 222 // INITIALIZED -> RESUMED 223 private ScanResumerState state = ScanResumerState.INITIALIZED; 224 225 private ScanResponse resp; 226 227 private int numberOfCompleteRows; 228 229 // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed 230 // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task 231 // every time when the previous task is finished. There could also be race as the renewal is 232 // executed in the timer thread, so we also need to check the state before lease renewal. If the 233 // state is RESUMED already, we will give up lease renewal and also not schedule the next lease 234 // renewal task. 235 private Timeout leaseRenewer; 236 237 @Override 238 public void resume() { 239 // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we 240 // just return at the first if condition without loading the resp and numValidResuls field. If 241 // resume is called after suspend, then it is also safe to just reference resp and 242 // numValidResults after the synchronized block as no one will change it anymore. 243 ScanResponse localResp; 244 int localNumberOfCompleteRows; 245 synchronized (this) { 246 if (state == ScanResumerState.INITIALIZED) { 247 // user calls this method before we call prepare, so just set the state to 248 // RESUMED, the implementation will just go on. 249 state = ScanResumerState.RESUMED; 250 return; 251 } 252 if (state == ScanResumerState.RESUMED) { 253 // already resumed, give up. 254 return; 255 } 256 state = ScanResumerState.RESUMED; 257 if (leaseRenewer != null) { 258 leaseRenewer.cancel(); 259 } 260 localResp = this.resp; 261 localNumberOfCompleteRows = this.numberOfCompleteRows; 262 } 263 completeOrNext(localResp, localNumberOfCompleteRows); 264 } 265 266 private void scheduleRenewLeaseTask() { 267 leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2, 268 TimeUnit.NANOSECONDS); 269 } 270 271 private synchronized void tryRenewLease() { 272 // the scan has already been resumed, give up 273 if (state == ScanResumerState.RESUMED) { 274 return; 275 } 276 renewLease(); 277 // schedule the next renew lease task again as this is a one-time task. 278 scheduleRenewLeaseTask(); 279 } 280 281 // return false if the scan has already been resumed. See the comment above for ScanResumerImpl 282 // for more details. 283 synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) { 284 if (state == ScanResumerState.RESUMED) { 285 // user calls resume before we actually suspend the scan, just continue; 286 return false; 287 } 288 state = ScanResumerState.SUSPENDED; 289 this.resp = resp; 290 this.numberOfCompleteRows = numberOfCompleteRows; 291 // if there are no more results in region then the scanner at RS side will be closed 292 // automatically so we do not need to renew lease. 293 if (resp.getMoreResultsInRegion()) { 294 // schedule renew lease task 295 scheduleRenewLeaseTask(); 296 } 297 return true; 298 } 299 } 300 301 public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, 302 AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId, 303 ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub, 304 HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs, 305 long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { 306 this.retryTimer = retryTimer; 307 this.scan = scan; 308 this.scanMetrics = scanMetrics; 309 this.scannerId = scannerId; 310 this.resultCache = resultCache; 311 this.consumer = consumer; 312 this.stub = stub; 313 this.loc = loc; 314 this.regionServerRemote = isRegionServerRemote; 315 this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; 316 this.pauseNs = pauseNs; 317 this.maxAttempts = maxAttempts; 318 this.scanTimeoutNs = scanTimeoutNs; 319 this.rpcTimeoutNs = rpcTimeoutNs; 320 this.startLogErrorsCnt = startLogErrorsCnt; 321 if (scan.isReversed()) { 322 completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; 323 } else { 324 completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; 325 } 326 this.future = new CompletableFuture<>(); 327 this.controller = conn.rpcControllerFactory.newController(); 328 this.exceptions = new ArrayList<>(); 329 } 330 331 private long elapsedMs() { 332 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); 333 } 334 335 private long remainingTimeNs() { 336 return scanTimeoutNs - (System.nanoTime() - nextCallStartNs); 337 } 338 339 private void closeScanner() { 340 incRPCCallsMetrics(scanMetrics, regionServerRemote); 341 resetController(controller, rpcTimeoutNs); 342 ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); 343 stub.scan(controller, req, resp -> { 344 if (controller.failed()) { 345 LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId + 346 " for " + loc.getRegion().getEncodedName() + " of " + 347 loc.getRegion().getTable() + " failed, ignore, probably already closed", 348 controller.getFailed()); 349 } 350 }); 351 } 352 353 private void completeExceptionally(boolean closeScanner) { 354 resultCache.clear(); 355 if (closeScanner) { 356 closeScanner(); 357 } 358 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); 359 } 360 361 private void completeNoMoreResults() { 362 future.complete(false); 363 } 364 365 private void completeWithNextStartRow(byte[] row, boolean inclusive) { 366 scan.withStartRow(row, inclusive); 367 future.complete(true); 368 } 369 370 private void completeWhenError(boolean closeScanner) { 371 incRPCRetriesMetrics(scanMetrics, closeScanner); 372 resultCache.clear(); 373 if (closeScanner) { 374 closeScanner(); 375 } 376 if (nextStartRowWhenError != null) { 377 scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError); 378 } 379 future.complete(true); 380 } 381 382 private void onError(Throwable error) { 383 error = translateException(error); 384 if (tries > startLogErrorsCnt) { 385 LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " + 386 loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() + 387 " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + 388 TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() + 389 " ms", 390 error); 391 } 392 boolean scannerClosed = 393 error instanceof UnknownScannerException || error instanceof NotServingRegionException || 394 error instanceof RegionServerStoppedException || error instanceof ScannerResetException; 395 RetriesExhaustedException.ThrowableWithExtraContext qt = 396 new RetriesExhaustedException.ThrowableWithExtraContext(error, 397 EnvironmentEdgeManager.currentTime(), ""); 398 exceptions.add(qt); 399 if (tries >= maxAttempts) { 400 completeExceptionally(!scannerClosed); 401 return; 402 } 403 long delayNs; 404 if (scanTimeoutNs > 0) { 405 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 406 if (maxDelayNs <= 0) { 407 completeExceptionally(!scannerClosed); 408 return; 409 } 410 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); 411 } else { 412 delayNs = getPauseTime(pauseNs, tries - 1); 413 } 414 if (scannerClosed) { 415 completeWhenError(false); 416 return; 417 } 418 if (error instanceof OutOfOrderScannerNextException) { 419 completeWhenError(true); 420 return; 421 } 422 if (error instanceof DoNotRetryIOException) { 423 completeExceptionally(true); 424 return; 425 } 426 tries++; 427 retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); 428 } 429 430 private void updateNextStartRowWhenError(Result result) { 431 nextStartRowWhenError = result.getRow(); 432 includeNextStartRowWhenError = result.mayHaveMoreCellsInRow(); 433 } 434 435 private void completeWhenNoMoreResultsInRegion() { 436 if (noMoreResultsForScan(scan, loc.getRegion())) { 437 completeNoMoreResults(); 438 } else { 439 completeWithNextStartRow(loc.getRegion().getEndKey(), true); 440 } 441 } 442 443 private void completeReversedWhenNoMoreResultsInRegion() { 444 if (noMoreResultsForReverseScan(scan, loc.getRegion())) { 445 completeNoMoreResults(); 446 } else { 447 completeWithNextStartRow(loc.getRegion().getStartKey(), false); 448 } 449 } 450 451 private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) { 452 if (resp.hasMoreResults() && !resp.getMoreResults()) { 453 // RS tells us there is no more data for the whole scan 454 completeNoMoreResults(); 455 return; 456 } 457 if (scan.getLimit() > 0) { 458 // The RS should have set the moreResults field in ScanResponse to false when we have reached 459 // the limit, so we add an assert here. 460 int newLimit = scan.getLimit() - numberOfCompleteRows; 461 assert newLimit > 0; 462 scan.setLimit(newLimit); 463 } 464 // as in 2.0 this value will always be set 465 if (!resp.getMoreResultsInRegion()) { 466 completeWhenNoMoreResultsInRegion.run(); 467 return; 468 } 469 next(); 470 } 471 472 private void onComplete(HBaseRpcController controller, ScanResponse resp) { 473 if (controller.failed()) { 474 onError(controller.getFailed()); 475 return; 476 } 477 updateServerSideMetrics(scanMetrics, resp); 478 boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); 479 Result[] rawResults; 480 Result[] results; 481 int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows(); 482 try { 483 rawResults = ResponseConverter.getResults(controller.cellScanner(), resp); 484 updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage); 485 results = resultCache.addAndGet( 486 Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY), 487 isHeartbeatMessage); 488 } catch (IOException e) { 489 // We can not retry here. The server has responded normally and the call sequence has been 490 // increased so a new scan with the same call sequence will cause an 491 // OutOfOrderScannerNextException. Let the upper layer open a new scanner. 492 LOG.warn("decode scan response failed", e); 493 completeWhenError(true); 494 return; 495 } 496 497 ScanControllerImpl scanController; 498 if (results.length > 0) { 499 scanController = new ScanControllerImpl( 500 resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) 501 : Optional.empty()); 502 updateNextStartRowWhenError(results[results.length - 1]); 503 consumer.onNext(results, scanController); 504 } else { 505 Optional<Cursor> cursor = Optional.empty(); 506 if (resp.hasCursor()) { 507 cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor())); 508 } else if (scan.isNeedCursorResult() && rawResults.length > 0) { 509 // It is size limit exceed and we need to return the last Result's row. 510 // When user setBatch and the scanner is reopened, the server may return Results that 511 // user has seen and the last Result can not be seen because the number is not enough. 512 // So the row keys of results may not be same, we must use the last one. 513 cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow())); 514 } 515 scanController = new ScanControllerImpl(cursor); 516 if (isHeartbeatMessage || cursor.isPresent()) { 517 // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we 518 // want to pass a cursor to upper layer. 519 consumer.onHeartbeat(scanController); 520 } 521 } 522 ScanControllerState state = scanController.destroy(); 523 if (state == ScanControllerState.TERMINATED) { 524 if (resp.getMoreResultsInRegion()) { 525 // we have more results in region but user request to stop the scan, so we need to close the 526 // scanner explicitly. 527 closeScanner(); 528 } 529 completeNoMoreResults(); 530 return; 531 } 532 int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 533 if (state == ScanControllerState.SUSPENDED) { 534 if (scanController.resumer.prepare(resp, numberOfCompleteRows)) { 535 return; 536 } 537 } 538 completeOrNext(resp, numberOfCompleteRows); 539 } 540 541 private void call() { 542 // As we have a call sequence for scan, it is useless to have a different rpc timeout which is 543 // less than the scan timeout. If the server does not respond in time(usually this will not 544 // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when 545 // resending the next request and the only way to fix this is to close the scanner and open a 546 // new one. 547 long callTimeoutNs; 548 if (scanTimeoutNs > 0) { 549 long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); 550 if (remainingNs <= 0) { 551 completeExceptionally(true); 552 return; 553 } 554 callTimeoutNs = remainingNs; 555 } else { 556 callTimeoutNs = 0L; 557 } 558 incRPCCallsMetrics(scanMetrics, regionServerRemote); 559 if (tries > 1) { 560 incRPCRetriesMetrics(scanMetrics, regionServerRemote); 561 } 562 resetController(controller, callTimeoutNs); 563 ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, 564 nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit()); 565 stub.scan(controller, req, resp -> onComplete(controller, resp)); 566 } 567 568 private void next() { 569 nextCallSeq++; 570 tries = 1; 571 exceptions.clear(); 572 nextCallStartNs = System.nanoTime(); 573 call(); 574 } 575 576 private void renewLease() { 577 incRPCCallsMetrics(scanMetrics, regionServerRemote); 578 nextCallSeq++; 579 resetController(controller, rpcTimeoutNs); 580 ScanRequest req = 581 RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); 582 stub.scan(controller, req, resp -> { 583 }); 584 } 585 586 /** 587 * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also 588 * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the 589 * open scanner request is also needed because we may have some data in the CellScanner which is 590 * contained in the controller. 591 * @return {@code true} if we should continue, otherwise {@code false}. 592 */ 593 public CompletableFuture<Boolean> start(HBaseRpcController controller, 594 ScanResponse respWhenOpen) { 595 onComplete(controller, respWhenOpen); 596 return future; 597 } 598}