001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; 029import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Optional; 035import java.util.concurrent.CompletableFuture; 036import java.util.concurrent.TimeUnit; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.HRegionLocation; 039import org.apache.hadoop.hbase.NotServingRegionException; 040import org.apache.hadoop.hbase.UnknownScannerException; 041import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer; 042import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 043import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 044import org.apache.hadoop.hbase.exceptions.ScannerResetException; 045import org.apache.hadoop.hbase.ipc.HBaseRpcController; 046import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 053import org.apache.hbase.thirdparty.io.netty.util.Timeout; 054import org.apache.hbase.thirdparty.io.netty.util.Timer; 055 056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 057import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 058import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 063 064/** 065 * Retry caller for scanning a region. 066 * <p> 067 * We will modify the {@link Scan} object passed in directly. The upper layer should store the 068 * reference of this object and use it to open new single region scanners. 069 */ 070@InterfaceAudience.Private 071class AsyncScanSingleRegionRpcRetryingCaller { 072 073 private static final Logger LOG = 074 LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class); 075 076 private final Timer retryTimer; 077 078 private final Scan scan; 079 080 private final ScanMetrics scanMetrics; 081 082 private final long scannerId; 083 084 private final ScanResultCache resultCache; 085 086 private final AdvancedScanResultConsumer consumer; 087 088 private final ClientService.Interface stub; 089 090 private final HRegionLocation loc; 091 092 private final boolean regionServerRemote; 093 094 private final long scannerLeaseTimeoutPeriodNs; 095 096 private final long pauseNs; 097 098 private final int maxAttempts; 099 100 private final long scanTimeoutNs; 101 102 private final long rpcTimeoutNs; 103 104 private final int startLogErrorsCnt; 105 106 private final Runnable completeWhenNoMoreResultsInRegion; 107 108 private final CompletableFuture<Boolean> future; 109 110 private final HBaseRpcController controller; 111 112 private byte[] nextStartRowWhenError; 113 114 private boolean includeNextStartRowWhenError; 115 116 private long nextCallStartNs; 117 118 private int tries; 119 120 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions; 121 122 private long nextCallSeq = -1L; 123 124 private enum ScanControllerState { 125 INITIALIZED, SUSPENDED, TERMINATED, DESTROYED 126 } 127 128 // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments 129 // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid 130 // usage. We use two things to prevent invalid usage: 131 // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an 132 // IllegalStateException if the caller thread is not this thread. 133 // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will 134 // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to 135 // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will 136 // call destroy to get the current state and set the state to DESTROYED. And when user calls 137 // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw 138 // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call 139 // suspend or terminate so the state will still be INITIALIZED when back from onNext or 140 // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller 141 // to be used in the future. 142 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 143 // package private methods can only be called within the implementation of 144 // AsyncScanSingleRegionRpcRetryingCaller. 145 private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController { 146 147 // Make sure the methods are only called in this thread. 148 private final Thread callerThread; 149 150 private final Optional<Cursor> cursor; 151 152 // INITIALIZED -> SUSPENDED -> DESTROYED 153 // INITIALIZED -> TERMINATED -> DESTROYED 154 // INITIALIZED -> DESTROYED 155 // If the state is incorrect we will throw IllegalStateException. 156 private ScanControllerState state = ScanControllerState.INITIALIZED; 157 158 private ScanResumerImpl resumer; 159 160 public ScanControllerImpl(Optional<Cursor> cursor) { 161 this.callerThread = Thread.currentThread(); 162 this.cursor = cursor; 163 } 164 165 private void preCheck() { 166 Preconditions.checkState(Thread.currentThread() == callerThread, 167 "The current thread is %s, expected thread is %s, " + 168 "you should not call this method outside onNext or onHeartbeat", 169 Thread.currentThread(), callerThread); 170 Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED), 171 "Invalid Stopper state %s", state); 172 } 173 174 @Override 175 public ScanResumer suspend() { 176 preCheck(); 177 state = ScanControllerState.SUSPENDED; 178 ScanResumerImpl resumer = new ScanResumerImpl(); 179 this.resumer = resumer; 180 return resumer; 181 } 182 183 @Override 184 public void terminate() { 185 preCheck(); 186 state = ScanControllerState.TERMINATED; 187 } 188 189 // return the current state, and set the state to DESTROYED. 190 ScanControllerState destroy() { 191 ScanControllerState state = this.state; 192 this.state = ScanControllerState.DESTROYED; 193 return state; 194 } 195 196 @Override 197 public Optional<Cursor> cursor() { 198 return cursor; 199 } 200 } 201 202 private enum ScanResumerState { 203 INITIALIZED, SUSPENDED, RESUMED 204 } 205 206 // The resume method is allowed to be called in another thread so here we also use the 207 // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back 208 // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED, 209 // and when user calls resume method, we will change the state to RESUMED. But the resume method 210 // could be called in other thread, and in fact, user could just do this: 211 // controller.suspend().resume() 212 // This is strange but valid. This means the scan could be resumed before we call the prepare 213 // method to do the actual suspend work. So in the resume method, we will check if the state is 214 // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare 215 // method, if the state is RESUMED already, we will just return an let the scan go on. 216 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 217 // package private methods can only be called within the implementation of 218 // AsyncScanSingleRegionRpcRetryingCaller. 219 private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { 220 221 // INITIALIZED -> SUSPENDED -> RESUMED 222 // INITIALIZED -> RESUMED 223 private ScanResumerState state = ScanResumerState.INITIALIZED; 224 225 private ScanResponse resp; 226 227 private int numberOfCompleteRows; 228 229 // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed 230 // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task 231 // every time when the previous task is finished. There could also be race as the renewal is 232 // executed in the timer thread, so we also need to check the state before lease renewal. If the 233 // state is RESUMED already, we will give up lease renewal and also not schedule the next lease 234 // renewal task. 235 private Timeout leaseRenewer; 236 237 @Override 238 public void resume() { 239 // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we 240 // just return at the first if condition without loading the resp and numValidResuls field. If 241 // resume is called after suspend, then it is also safe to just reference resp and 242 // numValidResults after the synchronized block as no one will change it anymore. 243 ScanResponse localResp; 244 int localNumberOfCompleteRows; 245 synchronized (this) { 246 if (state == ScanResumerState.INITIALIZED) { 247 // user calls this method before we call prepare, so just set the state to 248 // RESUMED, the implementation will just go on. 249 state = ScanResumerState.RESUMED; 250 return; 251 } 252 if (state == ScanResumerState.RESUMED) { 253 // already resumed, give up. 254 return; 255 } 256 state = ScanResumerState.RESUMED; 257 if (leaseRenewer != null) { 258 leaseRenewer.cancel(); 259 } 260 localResp = this.resp; 261 localNumberOfCompleteRows = this.numberOfCompleteRows; 262 } 263 completeOrNext(localResp, localNumberOfCompleteRows); 264 } 265 266 private void scheduleRenewLeaseTask() { 267 leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2, 268 TimeUnit.NANOSECONDS); 269 } 270 271 private synchronized void tryRenewLease() { 272 // the scan has already been resumed, give up 273 if (state == ScanResumerState.RESUMED) { 274 return; 275 } 276 renewLease(); 277 // schedule the next renew lease task again as this is a one-time task. 278 scheduleRenewLeaseTask(); 279 } 280 281 // return false if the scan has already been resumed. See the comment above for ScanResumerImpl 282 // for more details. 283 synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) { 284 if (state == ScanResumerState.RESUMED) { 285 // user calls resume before we actually suspend the scan, just continue; 286 return false; 287 } 288 state = ScanResumerState.SUSPENDED; 289 this.resp = resp; 290 this.numberOfCompleteRows = numberOfCompleteRows; 291 // if there are no more results in region then the scanner at RS side will be closed 292 // automatically so we do not need to renew lease. 293 if (resp.getMoreResultsInRegion()) { 294 // schedule renew lease task 295 scheduleRenewLeaseTask(); 296 } 297 return true; 298 } 299 } 300 301 public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, 302 AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId, 303 ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub, 304 HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs, 305 long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { 306 this.retryTimer = retryTimer; 307 this.scan = scan; 308 this.scanMetrics = scanMetrics; 309 this.scannerId = scannerId; 310 this.resultCache = resultCache; 311 this.consumer = consumer; 312 this.stub = stub; 313 this.loc = loc; 314 this.regionServerRemote = isRegionServerRemote; 315 this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; 316 this.pauseNs = pauseNs; 317 this.maxAttempts = maxAttempts; 318 this.scanTimeoutNs = scanTimeoutNs; 319 this.rpcTimeoutNs = rpcTimeoutNs; 320 this.startLogErrorsCnt = startLogErrorsCnt; 321 if (scan.isReversed()) { 322 completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; 323 } else { 324 completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; 325 } 326 this.future = new CompletableFuture<>(); 327 this.controller = conn.rpcControllerFactory.newController(); 328 this.exceptions = new ArrayList<>(); 329 } 330 331 private long elapsedMs() { 332 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); 333 } 334 335 private long remainingTimeNs() { 336 return scanTimeoutNs - (System.nanoTime() - nextCallStartNs); 337 } 338 339 private void closeScanner() { 340 incRPCCallsMetrics(scanMetrics, regionServerRemote); 341 resetController(controller, rpcTimeoutNs); 342 ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); 343 stub.scan(controller, req, resp -> { 344 if (controller.failed()) { 345 LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId + 346 " for " + loc.getRegion().getEncodedName() + " of " + 347 loc.getRegion().getTable() + " failed, ignore, probably already closed", 348 controller.getFailed()); 349 } 350 }); 351 } 352 353 private void completeExceptionally(boolean closeScanner) { 354 resultCache.clear(); 355 if (closeScanner) { 356 closeScanner(); 357 } 358 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); 359 } 360 361 private void completeNoMoreResults() { 362 future.complete(false); 363 } 364 365 private void completeWithNextStartRow(byte[] row, boolean inclusive) { 366 scan.withStartRow(row, inclusive); 367 future.complete(true); 368 } 369 370 private void completeWhenError(boolean closeScanner) { 371 incRPCRetriesMetrics(scanMetrics, closeScanner); 372 resultCache.clear(); 373 if (closeScanner) { 374 closeScanner(); 375 } 376 if (nextStartRowWhenError != null) { 377 scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError); 378 } 379 future.complete(true); 380 } 381 382 private void onError(Throwable error) { 383 error = translateException(error); 384 if (tries > startLogErrorsCnt) { 385 LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " + 386 loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() + 387 " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + 388 TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() + 389 " ms", 390 error); 391 } 392 boolean scannerClosed = error instanceof UnknownScannerException || 393 error instanceof NotServingRegionException || error instanceof RegionServerStoppedException; 394 RetriesExhaustedException.ThrowableWithExtraContext qt = 395 new RetriesExhaustedException.ThrowableWithExtraContext(error, 396 EnvironmentEdgeManager.currentTime(), ""); 397 exceptions.add(qt); 398 if (tries >= maxAttempts) { 399 completeExceptionally(!scannerClosed); 400 return; 401 } 402 long delayNs; 403 if (scanTimeoutNs > 0) { 404 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 405 if (maxDelayNs <= 0) { 406 completeExceptionally(!scannerClosed); 407 return; 408 } 409 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); 410 } else { 411 delayNs = getPauseTime(pauseNs, tries - 1); 412 } 413 if (scannerClosed) { 414 completeWhenError(false); 415 return; 416 } 417 if (error instanceof OutOfOrderScannerNextException || error instanceof ScannerResetException) { 418 completeWhenError(true); 419 return; 420 } 421 if (error instanceof DoNotRetryIOException) { 422 completeExceptionally(true); 423 return; 424 } 425 tries++; 426 retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); 427 } 428 429 private void updateNextStartRowWhenError(Result result) { 430 nextStartRowWhenError = result.getRow(); 431 includeNextStartRowWhenError = result.mayHaveMoreCellsInRow(); 432 } 433 434 private void completeWhenNoMoreResultsInRegion() { 435 if (noMoreResultsForScan(scan, loc.getRegion())) { 436 completeNoMoreResults(); 437 } else { 438 completeWithNextStartRow(loc.getRegion().getEndKey(), true); 439 } 440 } 441 442 private void completeReversedWhenNoMoreResultsInRegion() { 443 if (noMoreResultsForReverseScan(scan, loc.getRegion())) { 444 completeNoMoreResults(); 445 } else { 446 completeWithNextStartRow(loc.getRegion().getStartKey(), false); 447 } 448 } 449 450 private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) { 451 if (resp.hasMoreResults() && !resp.getMoreResults()) { 452 // RS tells us there is no more data for the whole scan 453 completeNoMoreResults(); 454 return; 455 } 456 if (scan.getLimit() > 0) { 457 // The RS should have set the moreResults field in ScanResponse to false when we have reached 458 // the limit, so we add an assert here. 459 int newLimit = scan.getLimit() - numberOfCompleteRows; 460 assert newLimit > 0; 461 scan.setLimit(newLimit); 462 } 463 // as in 2.0 this value will always be set 464 if (!resp.getMoreResultsInRegion()) { 465 completeWhenNoMoreResultsInRegion.run(); 466 return; 467 } 468 next(); 469 } 470 471 private void onComplete(HBaseRpcController controller, ScanResponse resp) { 472 if (controller.failed()) { 473 onError(controller.getFailed()); 474 return; 475 } 476 updateServerSideMetrics(scanMetrics, resp); 477 boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); 478 Result[] rawResults; 479 Result[] results; 480 int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows(); 481 try { 482 rawResults = ResponseConverter.getResults(controller.cellScanner(), resp); 483 updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage); 484 results = resultCache.addAndGet( 485 Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY), 486 isHeartbeatMessage); 487 } catch (IOException e) { 488 // We can not retry here. The server has responded normally and the call sequence has been 489 // increased so a new scan with the same call sequence will cause an 490 // OutOfOrderScannerNextException. Let the upper layer open a new scanner. 491 LOG.warn("decode scan response failed", e); 492 completeWhenError(true); 493 return; 494 } 495 496 ScanControllerImpl scanController; 497 if (results.length > 0) { 498 scanController = new ScanControllerImpl( 499 resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) 500 : Optional.empty()); 501 updateNextStartRowWhenError(results[results.length - 1]); 502 consumer.onNext(results, scanController); 503 } else { 504 Optional<Cursor> cursor = Optional.empty(); 505 if (resp.hasCursor()) { 506 cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor())); 507 } else if (scan.isNeedCursorResult() && rawResults.length > 0) { 508 // It is size limit exceed and we need to return the last Result's row. 509 // When user setBatch and the scanner is reopened, the server may return Results that 510 // user has seen and the last Result can not be seen because the number is not enough. 511 // So the row keys of results may not be same, we must use the last one. 512 cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow())); 513 } 514 scanController = new ScanControllerImpl(cursor); 515 if (isHeartbeatMessage || cursor.isPresent()) { 516 // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we 517 // want to pass a cursor to upper layer. 518 consumer.onHeartbeat(scanController); 519 } 520 } 521 ScanControllerState state = scanController.destroy(); 522 if (state == ScanControllerState.TERMINATED) { 523 if (resp.getMoreResultsInRegion()) { 524 // we have more results in region but user request to stop the scan, so we need to close the 525 // scanner explicitly. 526 closeScanner(); 527 } 528 completeNoMoreResults(); 529 return; 530 } 531 int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 532 if (state == ScanControllerState.SUSPENDED) { 533 if (scanController.resumer.prepare(resp, numberOfCompleteRows)) { 534 return; 535 } 536 } 537 completeOrNext(resp, numberOfCompleteRows); 538 } 539 540 private void call() { 541 // As we have a call sequence for scan, it is useless to have a different rpc timeout which is 542 // less than the scan timeout. If the server does not respond in time(usually this will not 543 // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when 544 // resending the next request and the only way to fix this is to close the scanner and open a 545 // new one. 546 long callTimeoutNs; 547 if (scanTimeoutNs > 0) { 548 long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); 549 if (remainingNs <= 0) { 550 completeExceptionally(true); 551 return; 552 } 553 callTimeoutNs = remainingNs; 554 } else { 555 callTimeoutNs = 0L; 556 } 557 incRPCCallsMetrics(scanMetrics, regionServerRemote); 558 if (tries > 1) { 559 incRPCRetriesMetrics(scanMetrics, regionServerRemote); 560 } 561 resetController(controller, callTimeoutNs); 562 ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, 563 nextCallSeq, false, false, scan.getLimit()); 564 stub.scan(controller, req, resp -> onComplete(controller, resp)); 565 } 566 567 private void next() { 568 nextCallSeq++; 569 tries = 1; 570 exceptions.clear(); 571 nextCallStartNs = System.nanoTime(); 572 call(); 573 } 574 575 private void renewLease() { 576 incRPCCallsMetrics(scanMetrics, regionServerRemote); 577 nextCallSeq++; 578 resetController(controller, rpcTimeoutNs); 579 ScanRequest req = 580 RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); 581 stub.scan(controller, req, resp -> { 582 }); 583 } 584 585 /** 586 * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also 587 * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the 588 * open scanner request is also needed because we may have some data in the CellScanner which is 589 * contained in the controller. 590 * @return {@code true} if we should continue, otherwise {@code false}. 591 */ 592 public CompletableFuture<Boolean> start(HBaseRpcController controller, 593 ScanResponse respWhenOpen) { 594 onComplete(controller, respWhenOpen); 595 return future; 596 } 597}