001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; 029import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Optional; 035import java.util.concurrent.CompletableFuture; 036import java.util.concurrent.TimeUnit; 037import org.apache.hadoop.hbase.CallQueueTooBigException; 038import org.apache.hadoop.hbase.DoNotRetryIOException; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.NotServingRegionException; 041import org.apache.hadoop.hbase.UnknownScannerException; 042import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer; 043import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 044import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 045import org.apache.hadoop.hbase.exceptions.ScannerResetException; 046import org.apache.hadoop.hbase.ipc.HBaseRpcController; 047import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 054import org.apache.hbase.thirdparty.io.netty.util.Timeout; 055import org.apache.hbase.thirdparty.io.netty.util.Timer; 056 057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 058import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 059import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 064 065/** 066 * Retry caller for scanning a region. 067 * <p> 068 * We will modify the {@link Scan} object passed in directly. The upper layer should store the 069 * reference of this object and use it to open new single region scanners. 070 */ 071@InterfaceAudience.Private 072class AsyncScanSingleRegionRpcRetryingCaller { 073 074 private static final Logger LOG = 075 LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class); 076 077 private final Timer retryTimer; 078 079 private final Scan scan; 080 081 private final ScanMetrics scanMetrics; 082 083 private final long scannerId; 084 085 private final ScanResultCache resultCache; 086 087 private final AdvancedScanResultConsumer consumer; 088 089 private final ClientService.Interface stub; 090 091 private final HRegionLocation loc; 092 093 private final boolean regionServerRemote; 094 095 private final int priority; 096 097 private final long scannerLeaseTimeoutPeriodNs; 098 099 private final long pauseNs; 100 101 private final long pauseForCQTBENs; 102 103 private final int maxAttempts; 104 105 private final long scanTimeoutNs; 106 107 private final long rpcTimeoutNs; 108 109 private final int startLogErrorsCnt; 110 111 private final Runnable completeWhenNoMoreResultsInRegion; 112 113 private final CompletableFuture<Boolean> future; 114 115 private final HBaseRpcController controller; 116 117 private byte[] nextStartRowWhenError; 118 119 private boolean includeNextStartRowWhenError; 120 121 private long nextCallStartNs; 122 123 private int tries; 124 125 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions; 126 127 private long nextCallSeq = -1L; 128 129 private enum ScanControllerState { 130 INITIALIZED, SUSPENDED, TERMINATED, DESTROYED 131 } 132 133 // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments 134 // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid 135 // usage. We use two things to prevent invalid usage: 136 // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an 137 // IllegalStateException if the caller thread is not this thread. 138 // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will 139 // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to 140 // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will 141 // call destroy to get the current state and set the state to DESTROYED. And when user calls 142 // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw 143 // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call 144 // suspend or terminate so the state will still be INITIALIZED when back from onNext or 145 // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller 146 // to be used in the future. 147 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 148 // package private methods can only be called within the implementation of 149 // AsyncScanSingleRegionRpcRetryingCaller. 150 private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController { 151 152 // Make sure the methods are only called in this thread. 153 private final Thread callerThread; 154 155 private final Optional<Cursor> cursor; 156 157 // INITIALIZED -> SUSPENDED -> DESTROYED 158 // INITIALIZED -> TERMINATED -> DESTROYED 159 // INITIALIZED -> DESTROYED 160 // If the state is incorrect we will throw IllegalStateException. 161 private ScanControllerState state = ScanControllerState.INITIALIZED; 162 163 private ScanResumerImpl resumer; 164 165 public ScanControllerImpl(Optional<Cursor> cursor) { 166 this.callerThread = Thread.currentThread(); 167 this.cursor = cursor; 168 } 169 170 private void preCheck() { 171 Preconditions.checkState(Thread.currentThread() == callerThread, 172 "The current thread is %s, expected thread is %s, " + 173 "you should not call this method outside onNext or onHeartbeat", 174 Thread.currentThread(), callerThread); 175 Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED), 176 "Invalid Stopper state %s", state); 177 } 178 179 @Override 180 public ScanResumer suspend() { 181 preCheck(); 182 state = ScanControllerState.SUSPENDED; 183 ScanResumerImpl resumer = new ScanResumerImpl(); 184 this.resumer = resumer; 185 return resumer; 186 } 187 188 @Override 189 public void terminate() { 190 preCheck(); 191 state = ScanControllerState.TERMINATED; 192 } 193 194 // return the current state, and set the state to DESTROYED. 195 ScanControllerState destroy() { 196 ScanControllerState state = this.state; 197 this.state = ScanControllerState.DESTROYED; 198 return state; 199 } 200 201 @Override 202 public Optional<Cursor> cursor() { 203 return cursor; 204 } 205 } 206 207 private enum ScanResumerState { 208 INITIALIZED, SUSPENDED, RESUMED 209 } 210 211 // The resume method is allowed to be called in another thread so here we also use the 212 // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back 213 // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED, 214 // and when user calls resume method, we will change the state to RESUMED. But the resume method 215 // could be called in other thread, and in fact, user could just do this: 216 // controller.suspend().resume() 217 // This is strange but valid. This means the scan could be resumed before we call the prepare 218 // method to do the actual suspend work. So in the resume method, we will check if the state is 219 // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare 220 // method, if the state is RESUMED already, we will just return an let the scan go on. 221 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 222 // package private methods can only be called within the implementation of 223 // AsyncScanSingleRegionRpcRetryingCaller. 224 private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { 225 226 // INITIALIZED -> SUSPENDED -> RESUMED 227 // INITIALIZED -> RESUMED 228 private ScanResumerState state = ScanResumerState.INITIALIZED; 229 230 private ScanResponse resp; 231 232 private int numberOfCompleteRows; 233 234 // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed 235 // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task 236 // every time when the previous task is finished. There could also be race as the renewal is 237 // executed in the timer thread, so we also need to check the state before lease renewal. If the 238 // state is RESUMED already, we will give up lease renewal and also not schedule the next lease 239 // renewal task. 240 private Timeout leaseRenewer; 241 242 @Override 243 public void resume() { 244 // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we 245 // just return at the first if condition without loading the resp and numValidResuls field. If 246 // resume is called after suspend, then it is also safe to just reference resp and 247 // numValidResults after the synchronized block as no one will change it anymore. 248 ScanResponse localResp; 249 int localNumberOfCompleteRows; 250 synchronized (this) { 251 if (state == ScanResumerState.INITIALIZED) { 252 // user calls this method before we call prepare, so just set the state to 253 // RESUMED, the implementation will just go on. 254 state = ScanResumerState.RESUMED; 255 return; 256 } 257 if (state == ScanResumerState.RESUMED) { 258 // already resumed, give up. 259 return; 260 } 261 state = ScanResumerState.RESUMED; 262 if (leaseRenewer != null) { 263 leaseRenewer.cancel(); 264 } 265 localResp = this.resp; 266 localNumberOfCompleteRows = this.numberOfCompleteRows; 267 } 268 completeOrNext(localResp, localNumberOfCompleteRows); 269 } 270 271 private void scheduleRenewLeaseTask() { 272 leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2, 273 TimeUnit.NANOSECONDS); 274 } 275 276 private synchronized void tryRenewLease() { 277 // the scan has already been resumed, give up 278 if (state == ScanResumerState.RESUMED) { 279 return; 280 } 281 renewLease(); 282 // schedule the next renew lease task again as this is a one-time task. 283 scheduleRenewLeaseTask(); 284 } 285 286 // return false if the scan has already been resumed. See the comment above for ScanResumerImpl 287 // for more details. 288 synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) { 289 if (state == ScanResumerState.RESUMED) { 290 // user calls resume before we actually suspend the scan, just continue; 291 return false; 292 } 293 state = ScanResumerState.SUSPENDED; 294 this.resp = resp; 295 this.numberOfCompleteRows = numberOfCompleteRows; 296 // if there are no more results in region then the scanner at RS side will be closed 297 // automatically so we do not need to renew lease. 298 if (resp.getMoreResultsInRegion()) { 299 // schedule renew lease task 300 scheduleRenewLeaseTask(); 301 } 302 return true; 303 } 304 } 305 306 public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 307 Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, 308 AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, 309 boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, 310 long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, 311 int startLogErrorsCnt) { 312 this.retryTimer = retryTimer; 313 this.scan = scan; 314 this.scanMetrics = scanMetrics; 315 this.scannerId = scannerId; 316 this.resultCache = resultCache; 317 this.consumer = consumer; 318 this.stub = stub; 319 this.loc = loc; 320 this.regionServerRemote = isRegionServerRemote; 321 this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; 322 this.pauseNs = pauseNs; 323 this.pauseForCQTBENs = pauseForCQTBENs; 324 this.maxAttempts = maxAttempts; 325 this.scanTimeoutNs = scanTimeoutNs; 326 this.rpcTimeoutNs = rpcTimeoutNs; 327 this.startLogErrorsCnt = startLogErrorsCnt; 328 if (scan.isReversed()) { 329 completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; 330 } else { 331 completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; 332 } 333 this.future = new CompletableFuture<>(); 334 this.priority = priority; 335 this.controller = conn.rpcControllerFactory.newController(); 336 this.controller.setPriority(priority); 337 this.exceptions = new ArrayList<>(); 338 } 339 340 private long elapsedMs() { 341 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); 342 } 343 344 private long remainingTimeNs() { 345 return scanTimeoutNs - (System.nanoTime() - nextCallStartNs); 346 } 347 348 private void closeScanner() { 349 incRPCCallsMetrics(scanMetrics, regionServerRemote); 350 resetController(controller, rpcTimeoutNs, priority); 351 ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); 352 stub.scan(controller, req, resp -> { 353 if (controller.failed()) { 354 LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId + 355 " for " + loc.getRegion().getEncodedName() + " of " + 356 loc.getRegion().getTable() + " failed, ignore, probably already closed", 357 controller.getFailed()); 358 } 359 }); 360 } 361 362 private void completeExceptionally(boolean closeScanner) { 363 resultCache.clear(); 364 if (closeScanner) { 365 closeScanner(); 366 } 367 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); 368 } 369 370 private void completeNoMoreResults() { 371 future.complete(false); 372 } 373 374 private void completeWithNextStartRow(byte[] row, boolean inclusive) { 375 scan.withStartRow(row, inclusive); 376 future.complete(true); 377 } 378 379 private void completeWhenError(boolean closeScanner) { 380 incRPCRetriesMetrics(scanMetrics, closeScanner); 381 resultCache.clear(); 382 if (closeScanner) { 383 closeScanner(); 384 } 385 if (nextStartRowWhenError != null) { 386 scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError); 387 } 388 future.complete(true); 389 } 390 391 private void onError(Throwable error) { 392 error = translateException(error); 393 if (tries > startLogErrorsCnt) { 394 LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " + 395 loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() + 396 " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " + 397 TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() + 398 " ms", 399 error); 400 } 401 boolean scannerClosed = 402 error instanceof UnknownScannerException || error instanceof NotServingRegionException || 403 error instanceof RegionServerStoppedException || error instanceof ScannerResetException; 404 RetriesExhaustedException.ThrowableWithExtraContext qt = 405 new RetriesExhaustedException.ThrowableWithExtraContext(error, 406 EnvironmentEdgeManager.currentTime(), ""); 407 exceptions.add(qt); 408 if (tries >= maxAttempts) { 409 completeExceptionally(!scannerClosed); 410 return; 411 } 412 long delayNs; 413 long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs; 414 if (scanTimeoutNs > 0) { 415 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 416 if (maxDelayNs <= 0) { 417 completeExceptionally(!scannerClosed); 418 return; 419 } 420 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); 421 } else { 422 delayNs = getPauseTime(pauseNsToUse, tries - 1); 423 } 424 if (scannerClosed) { 425 completeWhenError(false); 426 return; 427 } 428 if (error instanceof OutOfOrderScannerNextException) { 429 completeWhenError(true); 430 return; 431 } 432 if (error instanceof DoNotRetryIOException) { 433 completeExceptionally(true); 434 return; 435 } 436 tries++; 437 retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); 438 } 439 440 private void updateNextStartRowWhenError(Result result) { 441 nextStartRowWhenError = result.getRow(); 442 includeNextStartRowWhenError = result.mayHaveMoreCellsInRow(); 443 } 444 445 private void completeWhenNoMoreResultsInRegion() { 446 if (noMoreResultsForScan(scan, loc.getRegion())) { 447 completeNoMoreResults(); 448 } else { 449 completeWithNextStartRow(loc.getRegion().getEndKey(), true); 450 } 451 } 452 453 private void completeReversedWhenNoMoreResultsInRegion() { 454 if (noMoreResultsForReverseScan(scan, loc.getRegion())) { 455 completeNoMoreResults(); 456 } else { 457 completeWithNextStartRow(loc.getRegion().getStartKey(), false); 458 } 459 } 460 461 private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) { 462 if (resp.hasMoreResults() && !resp.getMoreResults()) { 463 // RS tells us there is no more data for the whole scan 464 completeNoMoreResults(); 465 return; 466 } 467 if (scan.getLimit() > 0) { 468 // The RS should have set the moreResults field in ScanResponse to false when we have reached 469 // the limit, so we add an assert here. 470 int newLimit = scan.getLimit() - numberOfCompleteRows; 471 assert newLimit > 0; 472 scan.setLimit(newLimit); 473 } 474 // as in 2.0 this value will always be set 475 if (!resp.getMoreResultsInRegion()) { 476 completeWhenNoMoreResultsInRegion.run(); 477 return; 478 } 479 next(); 480 } 481 482 private void onComplete(HBaseRpcController controller, ScanResponse resp) { 483 if (controller.failed()) { 484 onError(controller.getFailed()); 485 return; 486 } 487 updateServerSideMetrics(scanMetrics, resp); 488 boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); 489 Result[] rawResults; 490 Result[] results; 491 int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows(); 492 try { 493 rawResults = ResponseConverter.getResults(controller.cellScanner(), resp); 494 updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage); 495 results = resultCache.addAndGet( 496 Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY), 497 isHeartbeatMessage); 498 } catch (IOException e) { 499 // We can not retry here. The server has responded normally and the call sequence has been 500 // increased so a new scan with the same call sequence will cause an 501 // OutOfOrderScannerNextException. Let the upper layer open a new scanner. 502 LOG.warn("decode scan response failed", e); 503 completeWhenError(true); 504 return; 505 } 506 507 ScanControllerImpl scanController; 508 if (results.length > 0) { 509 scanController = new ScanControllerImpl( 510 resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) 511 : Optional.empty()); 512 updateNextStartRowWhenError(results[results.length - 1]); 513 consumer.onNext(results, scanController); 514 } else { 515 Optional<Cursor> cursor = Optional.empty(); 516 if (resp.hasCursor()) { 517 cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor())); 518 } else if (scan.isNeedCursorResult() && rawResults.length > 0) { 519 // It is size limit exceed and we need to return the last Result's row. 520 // When user setBatch and the scanner is reopened, the server may return Results that 521 // user has seen and the last Result can not be seen because the number is not enough. 522 // So the row keys of results may not be same, we must use the last one. 523 cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow())); 524 } 525 scanController = new ScanControllerImpl(cursor); 526 if (isHeartbeatMessage || cursor.isPresent()) { 527 // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we 528 // want to pass a cursor to upper layer. 529 consumer.onHeartbeat(scanController); 530 } 531 } 532 ScanControllerState state = scanController.destroy(); 533 if (state == ScanControllerState.TERMINATED) { 534 if (resp.getMoreResultsInRegion()) { 535 // we have more results in region but user request to stop the scan, so we need to close the 536 // scanner explicitly. 537 closeScanner(); 538 } 539 completeNoMoreResults(); 540 return; 541 } 542 int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 543 if (state == ScanControllerState.SUSPENDED) { 544 if (scanController.resumer.prepare(resp, numberOfCompleteRows)) { 545 return; 546 } 547 } 548 completeOrNext(resp, numberOfCompleteRows); 549 } 550 551 private void call() { 552 // As we have a call sequence for scan, it is useless to have a different rpc timeout which is 553 // less than the scan timeout. If the server does not respond in time(usually this will not 554 // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when 555 // resending the next request and the only way to fix this is to close the scanner and open a 556 // new one. 557 long callTimeoutNs; 558 if (scanTimeoutNs > 0) { 559 long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); 560 if (remainingNs <= 0) { 561 completeExceptionally(true); 562 return; 563 } 564 callTimeoutNs = remainingNs; 565 } else { 566 callTimeoutNs = 0L; 567 } 568 incRPCCallsMetrics(scanMetrics, regionServerRemote); 569 if (tries > 1) { 570 incRPCRetriesMetrics(scanMetrics, regionServerRemote); 571 } 572 resetController(controller, callTimeoutNs, priority); 573 ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, 574 nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit()); 575 stub.scan(controller, req, resp -> onComplete(controller, resp)); 576 } 577 578 private void next() { 579 nextCallSeq++; 580 tries = 1; 581 exceptions.clear(); 582 nextCallStartNs = System.nanoTime(); 583 call(); 584 } 585 586 private void renewLease() { 587 incRPCCallsMetrics(scanMetrics, regionServerRemote); 588 nextCallSeq++; 589 resetController(controller, rpcTimeoutNs, priority); 590 ScanRequest req = 591 RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); 592 stub.scan(controller, req, resp -> { 593 }); 594 } 595 596 /** 597 * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also 598 * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the 599 * open scanner request is also needed because we may have some data in the CellScanner which is 600 * contained in the controller. 601 * @return {@code true} if we should continue, otherwise {@code false}. 602 */ 603 public CompletableFuture<Boolean> start(HBaseRpcController controller, 604 ScanResponse respWhenOpen) { 605 onComplete(controller, respWhenOpen); 606 return future; 607 } 608}