001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; 029import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; 030 031import io.opentelemetry.context.Context; 032import io.opentelemetry.context.Scope; 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.List; 036import java.util.Optional; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.TimeUnit; 039import org.apache.hadoop.hbase.DoNotRetryIOException; 040import org.apache.hadoop.hbase.HBaseServerException; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionLocation; 043import org.apache.hadoop.hbase.NotServingRegionException; 044import org.apache.hadoop.hbase.UnknownScannerException; 045import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer; 046import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 047import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 048import org.apache.hadoop.hbase.exceptions.ScannerResetException; 049import org.apache.hadoop.hbase.ipc.HBaseRpcController; 050import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 057import org.apache.hbase.thirdparty.io.netty.util.Timeout; 058import org.apache.hbase.thirdparty.io.netty.util.Timer; 059 060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 061import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 062import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 067 068/** 069 * Retry caller for scanning a region. 070 * <p> 071 * We will modify the {@link Scan} object passed in directly. The upper layer should store the 072 * reference of this object and use it to open new single region scanners. 073 */ 074@InterfaceAudience.Private 075class AsyncScanSingleRegionRpcRetryingCaller { 076 077 private static final Logger LOG = 078 LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class); 079 080 private final Timer retryTimer; 081 082 private final Scan scan; 083 084 private final ScanMetrics scanMetrics; 085 086 private final long scannerId; 087 088 private final ScanResultCache resultCache; 089 090 private final AdvancedScanResultConsumer consumer; 091 092 private final ClientService.Interface stub; 093 094 private final HRegionLocation loc; 095 096 private final boolean regionServerRemote; 097 098 private final int priority; 099 100 private final long scannerLeaseTimeoutPeriodNs; 101 102 private final long pauseNs; 103 104 private final long pauseNsForServerOverloaded; 105 106 private final int maxAttempts; 107 108 private final long scanTimeoutNs; 109 110 private final long rpcTimeoutNs; 111 112 private final int startLogErrorsCnt; 113 114 private final Runnable completeWhenNoMoreResultsInRegion; 115 116 private final CompletableFuture<Boolean> future; 117 118 private final HBaseRpcController controller; 119 120 private byte[] nextStartRowWhenError; 121 122 private boolean includeNextStartRowWhenError; 123 124 private long nextCallStartNs; 125 126 private int tries; 127 128 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions; 129 130 private long nextCallSeq = -1L; 131 132 private enum ScanControllerState { 133 INITIALIZED, 134 SUSPENDED, 135 TERMINATED, 136 DESTROYED 137 } 138 139 // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments 140 // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid 141 // usage. We use two things to prevent invalid usage: 142 // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an 143 // IllegalStateException if the caller thread is not this thread. 144 // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will 145 // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to 146 // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will 147 // call destroy to get the current state and set the state to DESTROYED. And when user calls 148 // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw 149 // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call 150 // suspend or terminate so the state will still be INITIALIZED when back from onNext or 151 // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller 152 // to be used in the future. 153 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 154 // package private methods can only be called within the implementation of 155 // AsyncScanSingleRegionRpcRetryingCaller. 156 private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController { 157 158 // Make sure the methods are only called in this thread. 159 private final Thread callerThread; 160 161 private final Optional<Cursor> cursor; 162 163 // INITIALIZED -> SUSPENDED -> DESTROYED 164 // INITIALIZED -> TERMINATED -> DESTROYED 165 // INITIALIZED -> DESTROYED 166 // If the state is incorrect we will throw IllegalStateException. 167 private ScanControllerState state = ScanControllerState.INITIALIZED; 168 169 private ScanResumerImpl resumer; 170 171 public ScanControllerImpl(Optional<Cursor> cursor) { 172 this.callerThread = Thread.currentThread(); 173 this.cursor = cursor; 174 } 175 176 private void preCheck() { 177 Preconditions.checkState(Thread.currentThread() == callerThread, 178 "The current thread is %s, expected thread is %s, " 179 + "you should not call this method outside onNext or onHeartbeat", 180 Thread.currentThread(), callerThread); 181 Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED), 182 "Invalid Stopper state %s", state); 183 } 184 185 @Override 186 public ScanResumer suspend() { 187 preCheck(); 188 state = ScanControllerState.SUSPENDED; 189 ScanResumerImpl resumer = new ScanResumerImpl(); 190 this.resumer = resumer; 191 return resumer; 192 } 193 194 @Override 195 public void terminate() { 196 preCheck(); 197 state = ScanControllerState.TERMINATED; 198 } 199 200 // return the current state, and set the state to DESTROYED. 201 ScanControllerState destroy() { 202 ScanControllerState state = this.state; 203 this.state = ScanControllerState.DESTROYED; 204 return state; 205 } 206 207 @Override 208 public Optional<Cursor> cursor() { 209 return cursor; 210 } 211 } 212 213 private enum ScanResumerState { 214 INITIALIZED, 215 SUSPENDED, 216 RESUMED 217 } 218 219 // The resume method is allowed to be called in another thread so here we also use the 220 // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back 221 // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED, 222 // and when user calls resume method, we will change the state to RESUMED. But the resume method 223 // could be called in other thread, and in fact, user could just do this: 224 // controller.suspend().resume() 225 // This is strange but valid. This means the scan could be resumed before we call the prepare 226 // method to do the actual suspend work. So in the resume method, we will check if the state is 227 // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare 228 // method, if the state is RESUMED already, we will just return an let the scan go on. 229 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 230 // package private methods can only be called within the implementation of 231 // AsyncScanSingleRegionRpcRetryingCaller. 232 private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { 233 234 // INITIALIZED -> SUSPENDED -> RESUMED 235 // INITIALIZED -> RESUMED 236 private ScanResumerState state = ScanResumerState.INITIALIZED; 237 238 private ScanResponse resp; 239 240 private int numberOfCompleteRows; 241 242 // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed 243 // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task 244 // every time when the previous task is finished. There could also be race as the renewal is 245 // executed in the timer thread, so we also need to check the state before lease renewal. If the 246 // state is RESUMED already, we will give up lease renewal and also not schedule the next lease 247 // renewal task. 248 private Timeout leaseRenewer; 249 250 @Override 251 public void resume() { 252 // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we 253 // just return at the first if condition without loading the resp and numValidResuls field. If 254 // resume is called after suspend, then it is also safe to just reference resp and 255 // numValidResults after the synchronized block as no one will change it anymore. 256 ScanResponse localResp; 257 int localNumberOfCompleteRows; 258 synchronized (this) { 259 if (state == ScanResumerState.INITIALIZED) { 260 // user calls this method before we call prepare, so just set the state to 261 // RESUMED, the implementation will just go on. 262 state = ScanResumerState.RESUMED; 263 return; 264 } 265 if (state == ScanResumerState.RESUMED) { 266 // already resumed, give up. 267 return; 268 } 269 state = ScanResumerState.RESUMED; 270 if (leaseRenewer != null) { 271 leaseRenewer.cancel(); 272 } 273 localResp = this.resp; 274 localNumberOfCompleteRows = this.numberOfCompleteRows; 275 } 276 completeOrNext(localResp, localNumberOfCompleteRows); 277 } 278 279 private void scheduleRenewLeaseTask() { 280 leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2, 281 TimeUnit.NANOSECONDS); 282 } 283 284 private synchronized void tryRenewLease() { 285 // the scan has already been resumed, give up 286 if (state == ScanResumerState.RESUMED) { 287 return; 288 } 289 renewLease(); 290 // schedule the next renew lease task again as this is a one-time task. 291 scheduleRenewLeaseTask(); 292 } 293 294 // return false if the scan has already been resumed. See the comment above for ScanResumerImpl 295 // for more details. 296 synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) { 297 if (state == ScanResumerState.RESUMED) { 298 // user calls resume before we actually suspend the scan, just continue; 299 return false; 300 } 301 state = ScanResumerState.SUSPENDED; 302 this.resp = resp; 303 this.numberOfCompleteRows = numberOfCompleteRows; 304 // if there are no more results in region then the scanner at RS side will be closed 305 // automatically so we do not need to renew lease. 306 if (resp.getMoreResultsInRegion()) { 307 // schedule renew lease task 308 scheduleRenewLeaseTask(); 309 } 310 return true; 311 } 312 } 313 314 public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 315 Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, 316 AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, 317 boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, 318 long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, 319 int startLogErrorsCnt) { 320 this.retryTimer = retryTimer; 321 this.scan = scan; 322 this.scanMetrics = scanMetrics; 323 this.scannerId = scannerId; 324 this.resultCache = resultCache; 325 this.consumer = consumer; 326 this.stub = stub; 327 this.loc = loc; 328 this.regionServerRemote = isRegionServerRemote; 329 this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; 330 this.pauseNs = pauseNs; 331 this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; 332 this.maxAttempts = maxAttempts; 333 this.scanTimeoutNs = scanTimeoutNs; 334 this.rpcTimeoutNs = rpcTimeoutNs; 335 this.startLogErrorsCnt = startLogErrorsCnt; 336 if (scan.isReversed()) { 337 completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; 338 } else { 339 completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; 340 } 341 this.future = new CompletableFuture<>(); 342 this.priority = priority; 343 this.controller = conn.rpcControllerFactory.newController(); 344 this.controller.setPriority(priority); 345 this.exceptions = new ArrayList<>(); 346 } 347 348 private long elapsedMs() { 349 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); 350 } 351 352 private long remainingTimeNs() { 353 return scanTimeoutNs - (System.nanoTime() - nextCallStartNs); 354 } 355 356 private void closeScanner() { 357 incRPCCallsMetrics(scanMetrics, regionServerRemote); 358 resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS); 359 ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); 360 stub.scan(controller, req, resp -> { 361 if (controller.failed()) { 362 LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId 363 + " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() 364 + " failed, ignore, probably already closed", controller.getFailed()); 365 } 366 }); 367 } 368 369 private void completeExceptionally(boolean closeScanner) { 370 resultCache.clear(); 371 if (closeScanner) { 372 closeScanner(); 373 } 374 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); 375 } 376 377 private void completeNoMoreResults() { 378 future.complete(false); 379 } 380 381 private void completeWithNextStartRow(byte[] row, boolean inclusive) { 382 scan.withStartRow(row, inclusive); 383 future.complete(true); 384 } 385 386 private void completeWhenError(boolean closeScanner) { 387 incRPCRetriesMetrics(scanMetrics, closeScanner); 388 resultCache.clear(); 389 if (closeScanner) { 390 closeScanner(); 391 } 392 if (nextStartRowWhenError != null) { 393 scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError); 394 } 395 future.complete(true); 396 } 397 398 private void onError(Throwable error) { 399 error = translateException(error); 400 if (tries > startLogErrorsCnt) { 401 LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " 402 + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() 403 + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " 404 + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() 405 + " ms", error); 406 } 407 boolean scannerClosed = 408 error instanceof UnknownScannerException || error instanceof NotServingRegionException 409 || error instanceof RegionServerStoppedException || error instanceof ScannerResetException; 410 RetriesExhaustedException.ThrowableWithExtraContext qt = 411 new RetriesExhaustedException.ThrowableWithExtraContext(error, 412 EnvironmentEdgeManager.currentTime(), ""); 413 exceptions.add(qt); 414 if (tries >= maxAttempts) { 415 completeExceptionally(!scannerClosed); 416 return; 417 } 418 long delayNs; 419 long pauseNsToUse = 420 HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs; 421 if (scanTimeoutNs > 0) { 422 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 423 if (maxDelayNs <= 0) { 424 completeExceptionally(!scannerClosed); 425 return; 426 } 427 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); 428 } else { 429 delayNs = getPauseTime(pauseNsToUse, tries - 1); 430 } 431 if (scannerClosed) { 432 completeWhenError(false); 433 return; 434 } 435 if (error instanceof OutOfOrderScannerNextException) { 436 completeWhenError(true); 437 return; 438 } 439 if (error instanceof DoNotRetryIOException) { 440 completeExceptionally(true); 441 return; 442 } 443 tries++; 444 retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); 445 } 446 447 private void updateNextStartRowWhenError(Result result) { 448 nextStartRowWhenError = result.getRow(); 449 includeNextStartRowWhenError = result.mayHaveMoreCellsInRow(); 450 } 451 452 private void completeWhenNoMoreResultsInRegion() { 453 if (noMoreResultsForScan(scan, loc.getRegion())) { 454 completeNoMoreResults(); 455 } else { 456 completeWithNextStartRow(loc.getRegion().getEndKey(), true); 457 } 458 } 459 460 private void completeReversedWhenNoMoreResultsInRegion() { 461 if (noMoreResultsForReverseScan(scan, loc.getRegion())) { 462 completeNoMoreResults(); 463 } else { 464 completeWithNextStartRow(loc.getRegion().getStartKey(), false); 465 } 466 } 467 468 private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) { 469 if (resp.hasMoreResults() && !resp.getMoreResults()) { 470 // RS tells us there is no more data for the whole scan 471 completeNoMoreResults(); 472 return; 473 } 474 if (scan.getLimit() > 0) { 475 // The RS should have set the moreResults field in ScanResponse to false when we have reached 476 // the limit, so we add an assert here. 477 int newLimit = scan.getLimit() - numberOfCompleteRows; 478 assert newLimit > 0; 479 scan.setLimit(newLimit); 480 } 481 // as in 2.0 this value will always be set 482 if (!resp.getMoreResultsInRegion()) { 483 completeWhenNoMoreResultsInRegion.run(); 484 return; 485 } 486 next(); 487 } 488 489 private void onComplete(HBaseRpcController controller, ScanResponse resp) { 490 if (controller.failed()) { 491 onError(controller.getFailed()); 492 return; 493 } 494 updateServerSideMetrics(scanMetrics, resp); 495 boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); 496 Result[] rawResults; 497 Result[] results; 498 int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows(); 499 try { 500 rawResults = ResponseConverter.getResults(controller.cellScanner(), resp); 501 updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage); 502 results = resultCache.addAndGet( 503 Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY), 504 isHeartbeatMessage); 505 } catch (IOException e) { 506 // We can not retry here. The server has responded normally and the call sequence has been 507 // increased so a new scan with the same call sequence will cause an 508 // OutOfOrderScannerNextException. Let the upper layer open a new scanner. 509 LOG.warn("decode scan response failed", e); 510 completeWhenError(true); 511 return; 512 } 513 514 ScanControllerImpl scanController; 515 if (results.length > 0) { 516 scanController = new ScanControllerImpl( 517 resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) : Optional.empty()); 518 updateNextStartRowWhenError(results[results.length - 1]); 519 consumer.onNext(results, scanController); 520 } else { 521 Optional<Cursor> cursor = Optional.empty(); 522 if (resp.hasCursor()) { 523 cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor())); 524 } else if (scan.isNeedCursorResult() && rawResults.length > 0) { 525 // It is size limit exceed and we need to return the last Result's row. 526 // When user setBatch and the scanner is reopened, the server may return Results that 527 // user has seen and the last Result can not be seen because the number is not enough. 528 // So the row keys of results may not be same, we must use the last one. 529 cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow())); 530 } 531 scanController = new ScanControllerImpl(cursor); 532 if (isHeartbeatMessage || cursor.isPresent()) { 533 // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we 534 // want to pass a cursor to upper layer. 535 consumer.onHeartbeat(scanController); 536 } 537 } 538 ScanControllerState state = scanController.destroy(); 539 if (state == ScanControllerState.TERMINATED) { 540 if (resp.getMoreResultsInRegion()) { 541 // we have more results in region but user request to stop the scan, so we need to close the 542 // scanner explicitly. 543 closeScanner(); 544 } 545 completeNoMoreResults(); 546 return; 547 } 548 int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 549 if (state == ScanControllerState.SUSPENDED) { 550 if (scanController.resumer.prepare(resp, numberOfCompleteRows)) { 551 return; 552 } 553 } 554 completeOrNext(resp, numberOfCompleteRows); 555 } 556 557 private void call() { 558 // As we have a call sequence for scan, it is useless to have a different rpc timeout which is 559 // less than the scan timeout. If the server does not respond in time(usually this will not 560 // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when 561 // resending the next request and the only way to fix this is to close the scanner and open a 562 // new one. 563 long callTimeoutNs; 564 if (scanTimeoutNs > 0) { 565 long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); 566 if (remainingNs <= 0) { 567 completeExceptionally(true); 568 return; 569 } 570 callTimeoutNs = remainingNs; 571 } else { 572 callTimeoutNs = 0L; 573 } 574 incRPCCallsMetrics(scanMetrics, regionServerRemote); 575 if (tries > 1) { 576 incRPCRetriesMetrics(scanMetrics, regionServerRemote); 577 } 578 resetController(controller, callTimeoutNs, priority); 579 ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, 580 nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit()); 581 final Context context = Context.current(); 582 stub.scan(controller, req, resp -> { 583 try (Scope ignored = context.makeCurrent()) { 584 onComplete(controller, resp); 585 } 586 }); 587 } 588 589 private void next() { 590 nextCallSeq++; 591 tries = 1; 592 exceptions.clear(); 593 nextCallStartNs = System.nanoTime(); 594 call(); 595 } 596 597 private void renewLease() { 598 incRPCCallsMetrics(scanMetrics, regionServerRemote); 599 nextCallSeq++; 600 resetController(controller, rpcTimeoutNs, priority); 601 ScanRequest req = 602 RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); 603 stub.scan(controller, req, resp -> { 604 }); 605 } 606 607 /** 608 * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also 609 * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the 610 * open scanner request is also needed because we may have some data in the CellScanner which is 611 * contained in the controller. 612 * @return {@code true} if we should continue, otherwise {@code false}. 613 */ 614 public CompletableFuture<Boolean> start(HBaseRpcController controller, 615 ScanResponse respWhenOpen) { 616 onComplete(controller, respWhenOpen); 617 return future; 618 } 619}