001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; 029 030import io.opentelemetry.context.Context; 031import io.opentelemetry.context.Scope; 032import java.io.IOException; 033import java.util.ArrayList; 034import java.util.List; 035import java.util.Map; 036import java.util.Optional; 037import java.util.OptionalLong; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.TimeUnit; 040import org.apache.hadoop.hbase.DoNotRetryIOException; 041import org.apache.hadoop.hbase.HBaseServerException; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.HRegionLocation; 044import org.apache.hadoop.hbase.NotServingRegionException; 045import org.apache.hadoop.hbase.UnknownScannerException; 046import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer; 047import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager; 048import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 049import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 050import org.apache.hadoop.hbase.exceptions.ScannerResetException; 051import org.apache.hadoop.hbase.ipc.HBaseRpcController; 052import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 059import org.apache.hbase.thirdparty.io.netty.util.Timeout; 060import org.apache.hbase.thirdparty.io.netty.util.Timer; 061 062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 063import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 064import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 069 070/** 071 * Retry caller for scanning a region. 072 * <p> 073 * We will modify the {@link Scan} object passed in directly. The upper layer should store the 074 * reference of this object and use it to open new single region scanners. 075 */ 076@InterfaceAudience.Private 077class AsyncScanSingleRegionRpcRetryingCaller { 078 079 private static final Logger LOG = 080 LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class); 081 082 private final Timer retryTimer; 083 084 private final Scan scan; 085 086 private final ScanMetrics scanMetrics; 087 088 private final long scannerId; 089 090 private final ScanResultCache resultCache; 091 092 private final AdvancedScanResultConsumer consumer; 093 094 private final ClientService.Interface stub; 095 096 private final HRegionLocation loc; 097 098 private final boolean regionServerRemote; 099 100 private final int priority; 101 102 private final long scannerLeaseTimeoutPeriodNs; 103 104 private final int maxAttempts; 105 106 private final long scanTimeoutNs; 107 108 private final long rpcTimeoutNs; 109 110 private final int startLogErrorsCnt; 111 112 private final Runnable completeWhenNoMoreResultsInRegion; 113 114 private final AsyncConnectionImpl conn; 115 116 private final CompletableFuture<Boolean> future; 117 118 private final HBaseRpcController controller; 119 120 private byte[] nextStartRowWhenError; 121 122 private boolean includeNextStartRowWhenError; 123 124 private long lastNextCallNanos; 125 126 private long nextCallStartNanos; 127 128 private int tries; 129 130 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions; 131 132 private long nextCallSeq = -1L; 133 134 private final HBaseServerExceptionPauseManager pauseManager; 135 136 private enum ScanControllerState { 137 INITIALIZED, 138 SUSPENDED, 139 TERMINATED, 140 DESTROYED 141 } 142 143 // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments 144 // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid 145 // usage. We use two things to prevent invalid usage: 146 // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an 147 // IllegalStateException if the caller thread is not this thread. 148 // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will 149 // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to 150 // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will 151 // call destroy to get the current state and set the state to DESTROYED. And when user calls 152 // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw 153 // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call 154 // suspend or terminate so the state will still be INITIALIZED when back from onNext or 155 // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller 156 // to be used in the future. 157 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 158 // package private methods can only be called within the implementation of 159 // AsyncScanSingleRegionRpcRetryingCaller. 160 private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController { 161 162 // Make sure the methods are only called in this thread. 163 private final Thread callerThread; 164 165 private final Optional<Cursor> cursor; 166 167 // INITIALIZED -> SUSPENDED -> DESTROYED 168 // INITIALIZED -> TERMINATED -> DESTROYED 169 // INITIALIZED -> DESTROYED 170 // If the state is incorrect we will throw IllegalStateException. 171 private ScanControllerState state = ScanControllerState.INITIALIZED; 172 173 private ScanResumerImpl resumer; 174 175 public ScanControllerImpl(Optional<Cursor> cursor) { 176 this.callerThread = Thread.currentThread(); 177 this.cursor = cursor; 178 } 179 180 private void preCheck() { 181 Preconditions.checkState(Thread.currentThread() == callerThread, 182 "The current thread is %s, expected thread is %s, " 183 + "you should not call this method outside onNext or onHeartbeat", 184 Thread.currentThread(), callerThread); 185 Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED), 186 "Invalid Stopper state %s", state); 187 } 188 189 @Override 190 public ScanResumer suspend() { 191 preCheck(); 192 state = ScanControllerState.SUSPENDED; 193 ScanResumerImpl resumer = new ScanResumerImpl(); 194 this.resumer = resumer; 195 return resumer; 196 } 197 198 @Override 199 public void terminate() { 200 preCheck(); 201 state = ScanControllerState.TERMINATED; 202 } 203 204 // return the current state, and set the state to DESTROYED. 205 ScanControllerState destroy() { 206 ScanControllerState state = this.state; 207 this.state = ScanControllerState.DESTROYED; 208 return state; 209 } 210 211 @Override 212 public Optional<Cursor> cursor() { 213 return cursor; 214 } 215 } 216 217 private enum ScanResumerState { 218 INITIALIZED, 219 SUSPENDED, 220 RESUMED 221 } 222 223 // The resume method is allowed to be called in another thread so here we also use the 224 // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back 225 // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED, 226 // and when user calls resume method, we will change the state to RESUMED. But the resume method 227 // could be called in other thread, and in fact, user could just do this: 228 // controller.suspend().resume() 229 // This is strange but valid. This means the scan could be resumed before we call the prepare 230 // method to do the actual suspend work. So in the resume method, we will check if the state is 231 // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare 232 // method, if the state is RESUMED already, we will just return an let the scan go on. 233 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 234 // package private methods can only be called within the implementation of 235 // AsyncScanSingleRegionRpcRetryingCaller. 236 @InterfaceAudience.Private 237 final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { 238 239 // INITIALIZED -> SUSPENDED -> RESUMED 240 // INITIALIZED -> RESUMED 241 private ScanResumerState state = ScanResumerState.INITIALIZED; 242 243 private ScanResponse resp; 244 245 private int numberOfCompleteRows; 246 247 // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed 248 // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task 249 // every time when the previous task is finished. There could also be race as the renewal is 250 // executed in the timer thread, so we also need to check the state before lease renewal. If the 251 // state is RESUMED already, we will give up lease renewal and also not schedule the next lease 252 // renewal task. 253 private Timeout leaseRenewer; 254 255 @Override 256 public void resume() { 257 doResume(false); 258 } 259 260 /** 261 * This method is used when {@link ScanControllerImpl#suspend} had ever been called to get a 262 * {@link ScanResumerImpl}, but now user stops scan and does not need any more scan results. 263 */ 264 public void terminate() { 265 doResume(true); 266 } 267 268 private void doResume(boolean stopScan) { 269 // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we 270 // just return at the first if condition without loading the resp and numValidResuls field. If 271 // resume is called after suspend, then it is also safe to just reference resp and 272 // numValidResults after the synchronized block as no one will change it anymore. 273 ScanResponse localResp; 274 int localNumberOfCompleteRows; 275 synchronized (this) { 276 if (state == ScanResumerState.INITIALIZED) { 277 // user calls this method before we call prepare, so just set the state to 278 // RESUMED, the implementation will just go on. 279 state = ScanResumerState.RESUMED; 280 return; 281 } 282 if (state == ScanResumerState.RESUMED) { 283 // already resumed, give up. 284 return; 285 } 286 state = ScanResumerState.RESUMED; 287 if (leaseRenewer != null) { 288 leaseRenewer.cancel(); 289 } 290 localResp = this.resp; 291 localNumberOfCompleteRows = this.numberOfCompleteRows; 292 } 293 if (stopScan) { 294 stopScan(localResp); 295 } else { 296 completeOrNext(localResp, localNumberOfCompleteRows); 297 } 298 } 299 300 private void scheduleRenewLeaseTask() { 301 leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2, 302 TimeUnit.NANOSECONDS); 303 } 304 305 private synchronized void tryRenewLease() { 306 // the scan has already been resumed, give up 307 if (state == ScanResumerState.RESUMED) { 308 return; 309 } 310 renewLease(); 311 // schedule the next renew lease task again as this is a one-time task. 312 scheduleRenewLeaseTask(); 313 } 314 315 // return false if the scan has already been resumed. See the comment above for ScanResumerImpl 316 // for more details. 317 synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) { 318 if (state == ScanResumerState.RESUMED) { 319 // user calls resume before we actually suspend the scan, just continue; 320 return false; 321 } 322 state = ScanResumerState.SUSPENDED; 323 this.resp = resp; 324 this.numberOfCompleteRows = numberOfCompleteRows; 325 // if there are no more results in region then the scanner at RS side will be closed 326 // automatically so we do not need to renew lease. 327 if (resp.getMoreResultsInRegion()) { 328 // schedule renew lease task 329 scheduleRenewLeaseTask(); 330 } 331 return true; 332 } 333 } 334 335 public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 336 Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, 337 AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, 338 boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, 339 long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, 340 long lastNextCallNanos, int startLogErrorsCnt, Map<String, byte[]> requestAttributes) { 341 this.retryTimer = retryTimer; 342 this.conn = conn; 343 this.scan = scan; 344 this.scanMetrics = scanMetrics; 345 this.scannerId = scannerId; 346 this.resultCache = resultCache; 347 this.consumer = consumer; 348 this.stub = stub; 349 this.loc = loc; 350 this.regionServerRemote = isRegionServerRemote; 351 this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; 352 this.maxAttempts = maxAttempts; 353 this.scanTimeoutNs = scanTimeoutNs; 354 this.rpcTimeoutNs = rpcTimeoutNs; 355 this.lastNextCallNanos = lastNextCallNanos; 356 this.startLogErrorsCnt = startLogErrorsCnt; 357 if (scan.isReversed()) { 358 completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; 359 } else { 360 completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; 361 } 362 this.future = new CompletableFuture<>(); 363 this.priority = priority; 364 this.controller = conn.rpcControllerFactory.newController(); 365 this.controller.setRequestAttributes(requestAttributes); 366 this.exceptions = new ArrayList<>(); 367 this.pauseManager = 368 new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs); 369 } 370 371 public long getLastNextCallNanos() { 372 return lastNextCallNanos; 373 } 374 375 private long elapsedMs() { 376 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNanos); 377 } 378 379 private void closeScanner() { 380 incRPCCallsMetrics(scanMetrics, regionServerRemote); 381 resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS, loc.getRegion().getTable()); 382 ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); 383 stub.scan(controller, req, resp -> { 384 if (controller.failed()) { 385 LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId 386 + " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() 387 + " failed, ignore, probably already closed", controller.getFailed()); 388 } 389 }); 390 } 391 392 private void completeExceptionally(boolean closeScanner) { 393 resultCache.clear(); 394 if (closeScanner) { 395 closeScanner(); 396 } 397 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); 398 } 399 400 private void completeNoMoreResults() { 401 future.complete(false); 402 } 403 404 private void completeWithNextStartRow(byte[] row, boolean inclusive) { 405 scan.withStartRow(row, inclusive); 406 future.complete(true); 407 } 408 409 private void completeWhenError(boolean closeScanner) { 410 incRPCRetriesMetrics(scanMetrics, closeScanner); 411 resultCache.clear(); 412 if (closeScanner) { 413 closeScanner(); 414 } 415 if (nextStartRowWhenError != null) { 416 scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError); 417 } 418 future.complete(true); 419 } 420 421 private void onError(Throwable error) { 422 error = translateException(error); 423 if (tries > startLogErrorsCnt) { 424 LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " 425 + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() 426 + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " 427 + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() 428 + " ms", error); 429 } 430 boolean scannerClosed = 431 error instanceof UnknownScannerException || error instanceof NotServingRegionException 432 || error instanceof RegionServerStoppedException || error instanceof ScannerResetException; 433 RetriesExhaustedException.ThrowableWithExtraContext qt = 434 new RetriesExhaustedException.ThrowableWithExtraContext(error, 435 EnvironmentEdgeManager.currentTime(), ""); 436 exceptions.add(qt); 437 if (tries >= maxAttempts) { 438 completeExceptionally(!scannerClosed); 439 return; 440 } 441 442 OptionalLong maybePauseNsToUse = 443 pauseManager.getPauseNsFromException(error, tries, nextCallStartNanos); 444 if (!maybePauseNsToUse.isPresent()) { 445 completeExceptionally(!scannerClosed); 446 return; 447 } 448 long delayNs = maybePauseNsToUse.getAsLong(); 449 if (scannerClosed) { 450 completeWhenError(false); 451 return; 452 } 453 if (error instanceof OutOfOrderScannerNextException) { 454 completeWhenError(true); 455 return; 456 } 457 if (error instanceof DoNotRetryIOException) { 458 completeExceptionally(true); 459 return; 460 } 461 tries++; 462 463 if (HBaseServerException.isServerOverloaded(error)) { 464 Optional<MetricsConnection> metrics = conn.getConnectionMetrics(); 465 metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); 466 } 467 retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); 468 } 469 470 private void updateNextStartRowWhenError(Result result) { 471 nextStartRowWhenError = result.getRow(); 472 includeNextStartRowWhenError = result.mayHaveMoreCellsInRow(); 473 } 474 475 private void completeWhenNoMoreResultsInRegion() { 476 if (noMoreResultsForScan(scan, loc.getRegion())) { 477 completeNoMoreResults(); 478 } else { 479 completeWithNextStartRow(loc.getRegion().getEndKey(), true); 480 } 481 } 482 483 private void completeReversedWhenNoMoreResultsInRegion() { 484 if (noMoreResultsForReverseScan(scan, loc.getRegion())) { 485 completeNoMoreResults(); 486 } else { 487 completeWithNextStartRow(loc.getRegion().getStartKey(), false); 488 } 489 } 490 491 private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) { 492 if (resp.hasMoreResults() && !resp.getMoreResults()) { 493 // RS tells us there is no more data for the whole scan 494 completeNoMoreResults(); 495 return; 496 } 497 if (scan.getLimit() > 0) { 498 // The RS should have set the moreResults field in ScanResponse to false when we have reached 499 // the limit, so we add an assert here. 500 int newLimit = scan.getLimit() - numberOfCompleteRows; 501 assert newLimit > 0; 502 scan.setLimit(newLimit); 503 } 504 // as in 2.0 this value will always be set 505 if (!resp.getMoreResultsInRegion()) { 506 completeWhenNoMoreResultsInRegion.run(); 507 return; 508 } 509 next(); 510 } 511 512 private void onComplete(HBaseRpcController controller, ScanResponse resp) { 513 if (controller.failed()) { 514 onError(controller.getFailed()); 515 return; 516 } 517 updateServerSideMetrics(scanMetrics, resp); 518 boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); 519 Result[] rawResults; 520 Result[] results; 521 int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows(); 522 try { 523 rawResults = ResponseConverter.getResults(controller.cellScanner(), resp); 524 updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage); 525 results = resultCache.addAndGet( 526 Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY), 527 isHeartbeatMessage); 528 } catch (IOException e) { 529 // We can not retry here. The server has responded normally and the call sequence has been 530 // increased so a new scan with the same call sequence will cause an 531 // OutOfOrderScannerNextException. Let the upper layer open a new scanner. 532 LOG.warn("decode scan response failed", e); 533 completeWhenError(true); 534 return; 535 } 536 537 ScanControllerImpl scanController; 538 if (results.length > 0) { 539 scanController = new ScanControllerImpl( 540 resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) : Optional.empty()); 541 updateNextStartRowWhenError(results[results.length - 1]); 542 consumer.onNext(results, scanController); 543 } else { 544 Optional<Cursor> cursor = Optional.empty(); 545 if (resp.hasCursor()) { 546 cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor())); 547 } else if (scan.isNeedCursorResult() && rawResults.length > 0) { 548 // It is size limit exceed and we need to return the last Result's row. 549 // When user setBatch and the scanner is reopened, the server may return Results that 550 // user has seen and the last Result can not be seen because the number is not enough. 551 // So the row keys of results may not be same, we must use the last one. 552 cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow())); 553 } 554 scanController = new ScanControllerImpl(cursor); 555 if (isHeartbeatMessage || cursor.isPresent()) { 556 // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we 557 // want to pass a cursor to upper layer. 558 consumer.onHeartbeat(scanController); 559 } 560 } 561 ScanControllerState state = scanController.destroy(); 562 if (state == ScanControllerState.TERMINATED) { 563 stopScan(resp); 564 return; 565 } 566 int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 567 if (state == ScanControllerState.SUSPENDED) { 568 if (scanController.resumer.prepare(resp, numberOfCompleteRows)) { 569 return; 570 } 571 } 572 completeOrNext(resp, numberOfCompleteRows); 573 } 574 575 private void stopScan(ScanResponse resp) { 576 if (resp.getMoreResultsInRegion()) { 577 // we have more results in region but user request to stop the scan, so we need to close the 578 // scanner explicitly. 579 closeScanner(); 580 } 581 completeNoMoreResults(); 582 } 583 584 private void call() { 585 // As we have a call sequence for scan, it is useless to have a different rpc timeout which is 586 // less than the scan timeout. If the server does not respond in time(usually this will not 587 // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when 588 // resending the next request and the only way to fix this is to close the scanner and open a 589 // new one. 590 long callTimeoutNs; 591 if (scanTimeoutNs > 0) { 592 long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNanos); 593 if (remainingNs <= 0) { 594 completeExceptionally(true); 595 return; 596 } 597 callTimeoutNs = remainingNs; 598 } else { 599 callTimeoutNs = 0L; 600 } 601 incRPCCallsMetrics(scanMetrics, regionServerRemote); 602 if (tries > 1) { 603 incRPCRetriesMetrics(scanMetrics, regionServerRemote); 604 } 605 resetController(controller, callTimeoutNs, priority, loc.getRegion().getTable()); 606 ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, 607 nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit()); 608 final Context context = Context.current(); 609 stub.scan(controller, req, resp -> { 610 try (Scope ignored = context.makeCurrent()) { 611 onComplete(controller, resp); 612 } 613 }); 614 } 615 616 private void next() { 617 nextCallSeq++; 618 tries = 1; 619 exceptions.clear(); 620 nextCallStartNanos = System.nanoTime(); 621 incMillisBetweenNextsMetrics(scanMetrics, 622 TimeUnit.NANOSECONDS.toMillis(nextCallStartNanos - lastNextCallNanos)); 623 lastNextCallNanos = nextCallStartNanos; 624 call(); 625 } 626 627 private void renewLease() { 628 incRPCCallsMetrics(scanMetrics, regionServerRemote); 629 nextCallSeq++; 630 resetController(controller, rpcTimeoutNs, priority, loc.getRegion().getTable()); 631 ScanRequest req = 632 RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); 633 stub.scan(controller, req, resp -> { 634 }); 635 } 636 637 /** 638 * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also 639 * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the 640 * open scanner request is also needed because we may have some data in the CellScanner which is 641 * contained in the controller. 642 * @return {@code true} if we should continue, otherwise {@code false}. 643 */ 644 public CompletableFuture<Boolean> start(HBaseRpcController controller, 645 ScanResponse respWhenOpen) { 646 onComplete(controller, respWhenOpen); 647 return future; 648 } 649}