001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; 027import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; 028import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; 029import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; 030 031import io.opentelemetry.context.Context; 032import io.opentelemetry.context.Scope; 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.List; 036import java.util.Optional; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.TimeUnit; 039import org.apache.hadoop.hbase.DoNotRetryIOException; 040import org.apache.hadoop.hbase.HBaseServerException; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionLocation; 043import org.apache.hadoop.hbase.NotServingRegionException; 044import org.apache.hadoop.hbase.UnknownScannerException; 045import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer; 046import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 047import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 048import org.apache.hadoop.hbase.exceptions.ScannerResetException; 049import org.apache.hadoop.hbase.ipc.HBaseRpcController; 050import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 057import org.apache.hbase.thirdparty.io.netty.util.Timeout; 058import org.apache.hbase.thirdparty.io.netty.util.Timer; 059 060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 061import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 062import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 067 068/** 069 * Retry caller for scanning a region. 070 * <p> 071 * We will modify the {@link Scan} object passed in directly. The upper layer should store the 072 * reference of this object and use it to open new single region scanners. 073 */ 074@InterfaceAudience.Private 075class AsyncScanSingleRegionRpcRetryingCaller { 076 077 private static final Logger LOG = 078 LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class); 079 080 private final Timer retryTimer; 081 082 private final Scan scan; 083 084 private final ScanMetrics scanMetrics; 085 086 private final long scannerId; 087 088 private final ScanResultCache resultCache; 089 090 private final AdvancedScanResultConsumer consumer; 091 092 private final ClientService.Interface stub; 093 094 private final HRegionLocation loc; 095 096 private final boolean regionServerRemote; 097 098 private final int priority; 099 100 private final long scannerLeaseTimeoutPeriodNs; 101 102 private final long pauseNs; 103 104 private final long pauseNsForServerOverloaded; 105 106 private final int maxAttempts; 107 108 private final long scanTimeoutNs; 109 110 private final long rpcTimeoutNs; 111 112 private final int startLogErrorsCnt; 113 114 private final Runnable completeWhenNoMoreResultsInRegion; 115 116 private final AsyncConnectionImpl conn; 117 118 private final CompletableFuture<Boolean> future; 119 120 private final HBaseRpcController controller; 121 122 private byte[] nextStartRowWhenError; 123 124 private boolean includeNextStartRowWhenError; 125 126 private long nextCallStartNs; 127 128 private int tries; 129 130 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions; 131 132 private long nextCallSeq = -1L; 133 134 private enum ScanControllerState { 135 INITIALIZED, 136 SUSPENDED, 137 TERMINATED, 138 DESTROYED 139 } 140 141 // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments 142 // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid 143 // usage. We use two things to prevent invalid usage: 144 // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an 145 // IllegalStateException if the caller thread is not this thread. 146 // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will 147 // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to 148 // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will 149 // call destroy to get the current state and set the state to DESTROYED. And when user calls 150 // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw 151 // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call 152 // suspend or terminate so the state will still be INITIALIZED when back from onNext or 153 // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller 154 // to be used in the future. 155 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 156 // package private methods can only be called within the implementation of 157 // AsyncScanSingleRegionRpcRetryingCaller. 158 private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController { 159 160 // Make sure the methods are only called in this thread. 161 private final Thread callerThread; 162 163 private final Optional<Cursor> cursor; 164 165 // INITIALIZED -> SUSPENDED -> DESTROYED 166 // INITIALIZED -> TERMINATED -> DESTROYED 167 // INITIALIZED -> DESTROYED 168 // If the state is incorrect we will throw IllegalStateException. 169 private ScanControllerState state = ScanControllerState.INITIALIZED; 170 171 private ScanResumerImpl resumer; 172 173 public ScanControllerImpl(Optional<Cursor> cursor) { 174 this.callerThread = Thread.currentThread(); 175 this.cursor = cursor; 176 } 177 178 private void preCheck() { 179 Preconditions.checkState(Thread.currentThread() == callerThread, 180 "The current thread is %s, expected thread is %s, " 181 + "you should not call this method outside onNext or onHeartbeat", 182 Thread.currentThread(), callerThread); 183 Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED), 184 "Invalid Stopper state %s", state); 185 } 186 187 @Override 188 public ScanResumer suspend() { 189 preCheck(); 190 state = ScanControllerState.SUSPENDED; 191 ScanResumerImpl resumer = new ScanResumerImpl(); 192 this.resumer = resumer; 193 return resumer; 194 } 195 196 @Override 197 public void terminate() { 198 preCheck(); 199 state = ScanControllerState.TERMINATED; 200 } 201 202 // return the current state, and set the state to DESTROYED. 203 ScanControllerState destroy() { 204 ScanControllerState state = this.state; 205 this.state = ScanControllerState.DESTROYED; 206 return state; 207 } 208 209 @Override 210 public Optional<Cursor> cursor() { 211 return cursor; 212 } 213 } 214 215 private enum ScanResumerState { 216 INITIALIZED, 217 SUSPENDED, 218 RESUMED 219 } 220 221 // The resume method is allowed to be called in another thread so here we also use the 222 // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back 223 // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED, 224 // and when user calls resume method, we will change the state to RESUMED. But the resume method 225 // could be called in other thread, and in fact, user could just do this: 226 // controller.suspend().resume() 227 // This is strange but valid. This means the scan could be resumed before we call the prepare 228 // method to do the actual suspend work. So in the resume method, we will check if the state is 229 // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare 230 // method, if the state is RESUMED already, we will just return an let the scan go on. 231 // Notice that, the public methods of this class is supposed to be called by upper layer only, and 232 // package private methods can only be called within the implementation of 233 // AsyncScanSingleRegionRpcRetryingCaller. 234 private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { 235 236 // INITIALIZED -> SUSPENDED -> RESUMED 237 // INITIALIZED -> RESUMED 238 private ScanResumerState state = ScanResumerState.INITIALIZED; 239 240 private ScanResponse resp; 241 242 private int numberOfCompleteRows; 243 244 // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed 245 // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task 246 // every time when the previous task is finished. There could also be race as the renewal is 247 // executed in the timer thread, so we also need to check the state before lease renewal. If the 248 // state is RESUMED already, we will give up lease renewal and also not schedule the next lease 249 // renewal task. 250 private Timeout leaseRenewer; 251 252 @Override 253 public void resume() { 254 // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we 255 // just return at the first if condition without loading the resp and numValidResuls field. If 256 // resume is called after suspend, then it is also safe to just reference resp and 257 // numValidResults after the synchronized block as no one will change it anymore. 258 ScanResponse localResp; 259 int localNumberOfCompleteRows; 260 synchronized (this) { 261 if (state == ScanResumerState.INITIALIZED) { 262 // user calls this method before we call prepare, so just set the state to 263 // RESUMED, the implementation will just go on. 264 state = ScanResumerState.RESUMED; 265 return; 266 } 267 if (state == ScanResumerState.RESUMED) { 268 // already resumed, give up. 269 return; 270 } 271 state = ScanResumerState.RESUMED; 272 if (leaseRenewer != null) { 273 leaseRenewer.cancel(); 274 } 275 localResp = this.resp; 276 localNumberOfCompleteRows = this.numberOfCompleteRows; 277 } 278 completeOrNext(localResp, localNumberOfCompleteRows); 279 } 280 281 private void scheduleRenewLeaseTask() { 282 leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2, 283 TimeUnit.NANOSECONDS); 284 } 285 286 private synchronized void tryRenewLease() { 287 // the scan has already been resumed, give up 288 if (state == ScanResumerState.RESUMED) { 289 return; 290 } 291 renewLease(); 292 // schedule the next renew lease task again as this is a one-time task. 293 scheduleRenewLeaseTask(); 294 } 295 296 // return false if the scan has already been resumed. See the comment above for ScanResumerImpl 297 // for more details. 298 synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) { 299 if (state == ScanResumerState.RESUMED) { 300 // user calls resume before we actually suspend the scan, just continue; 301 return false; 302 } 303 state = ScanResumerState.SUSPENDED; 304 this.resp = resp; 305 this.numberOfCompleteRows = numberOfCompleteRows; 306 // if there are no more results in region then the scanner at RS side will be closed 307 // automatically so we do not need to renew lease. 308 if (resp.getMoreResultsInRegion()) { 309 // schedule renew lease task 310 scheduleRenewLeaseTask(); 311 } 312 return true; 313 } 314 } 315 316 public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 317 Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, 318 AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, 319 boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, 320 long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, 321 int startLogErrorsCnt) { 322 this.retryTimer = retryTimer; 323 this.conn = conn; 324 this.scan = scan; 325 this.scanMetrics = scanMetrics; 326 this.scannerId = scannerId; 327 this.resultCache = resultCache; 328 this.consumer = consumer; 329 this.stub = stub; 330 this.loc = loc; 331 this.regionServerRemote = isRegionServerRemote; 332 this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; 333 this.pauseNs = pauseNs; 334 this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; 335 this.maxAttempts = maxAttempts; 336 this.scanTimeoutNs = scanTimeoutNs; 337 this.rpcTimeoutNs = rpcTimeoutNs; 338 this.startLogErrorsCnt = startLogErrorsCnt; 339 if (scan.isReversed()) { 340 completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; 341 } else { 342 completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; 343 } 344 this.future = new CompletableFuture<>(); 345 this.priority = priority; 346 this.controller = conn.rpcControllerFactory.newController(); 347 this.controller.setPriority(priority); 348 this.exceptions = new ArrayList<>(); 349 } 350 351 private long elapsedMs() { 352 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); 353 } 354 355 private long remainingTimeNs() { 356 return scanTimeoutNs - (System.nanoTime() - nextCallStartNs); 357 } 358 359 private void closeScanner() { 360 incRPCCallsMetrics(scanMetrics, regionServerRemote); 361 resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS); 362 ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); 363 stub.scan(controller, req, resp -> { 364 if (controller.failed()) { 365 LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId 366 + " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() 367 + " failed, ignore, probably already closed", controller.getFailed()); 368 } 369 }); 370 } 371 372 private void completeExceptionally(boolean closeScanner) { 373 resultCache.clear(); 374 if (closeScanner) { 375 closeScanner(); 376 } 377 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); 378 } 379 380 private void completeNoMoreResults() { 381 future.complete(false); 382 } 383 384 private void completeWithNextStartRow(byte[] row, boolean inclusive) { 385 scan.withStartRow(row, inclusive); 386 future.complete(true); 387 } 388 389 private void completeWhenError(boolean closeScanner) { 390 incRPCRetriesMetrics(scanMetrics, closeScanner); 391 resultCache.clear(); 392 if (closeScanner) { 393 closeScanner(); 394 } 395 if (nextStartRowWhenError != null) { 396 scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError); 397 } 398 future.complete(true); 399 } 400 401 private void onError(Throwable error) { 402 error = translateException(error); 403 if (tries > startLogErrorsCnt) { 404 LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " 405 + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() 406 + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " 407 + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() 408 + " ms", error); 409 } 410 boolean scannerClosed = 411 error instanceof UnknownScannerException || error instanceof NotServingRegionException 412 || error instanceof RegionServerStoppedException || error instanceof ScannerResetException; 413 RetriesExhaustedException.ThrowableWithExtraContext qt = 414 new RetriesExhaustedException.ThrowableWithExtraContext(error, 415 EnvironmentEdgeManager.currentTime(), ""); 416 exceptions.add(qt); 417 if (tries >= maxAttempts) { 418 completeExceptionally(!scannerClosed); 419 return; 420 } 421 long delayNs; 422 long pauseNsToUse = 423 HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs; 424 if (scanTimeoutNs > 0) { 425 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; 426 if (maxDelayNs <= 0) { 427 completeExceptionally(!scannerClosed); 428 return; 429 } 430 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); 431 } else { 432 delayNs = getPauseTime(pauseNsToUse, tries - 1); 433 } 434 if (scannerClosed) { 435 completeWhenError(false); 436 return; 437 } 438 if (error instanceof OutOfOrderScannerNextException) { 439 completeWhenError(true); 440 return; 441 } 442 if (error instanceof DoNotRetryIOException) { 443 completeExceptionally(true); 444 return; 445 } 446 tries++; 447 448 if (HBaseServerException.isServerOverloaded(error)) { 449 Optional<MetricsConnection> metrics = conn.getConnectionMetrics(); 450 metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); 451 } 452 retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); 453 } 454 455 private void updateNextStartRowWhenError(Result result) { 456 nextStartRowWhenError = result.getRow(); 457 includeNextStartRowWhenError = result.mayHaveMoreCellsInRow(); 458 } 459 460 private void completeWhenNoMoreResultsInRegion() { 461 if (noMoreResultsForScan(scan, loc.getRegion())) { 462 completeNoMoreResults(); 463 } else { 464 completeWithNextStartRow(loc.getRegion().getEndKey(), true); 465 } 466 } 467 468 private void completeReversedWhenNoMoreResultsInRegion() { 469 if (noMoreResultsForReverseScan(scan, loc.getRegion())) { 470 completeNoMoreResults(); 471 } else { 472 completeWithNextStartRow(loc.getRegion().getStartKey(), false); 473 } 474 } 475 476 private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) { 477 if (resp.hasMoreResults() && !resp.getMoreResults()) { 478 // RS tells us there is no more data for the whole scan 479 completeNoMoreResults(); 480 return; 481 } 482 if (scan.getLimit() > 0) { 483 // The RS should have set the moreResults field in ScanResponse to false when we have reached 484 // the limit, so we add an assert here. 485 int newLimit = scan.getLimit() - numberOfCompleteRows; 486 assert newLimit > 0; 487 scan.setLimit(newLimit); 488 } 489 // as in 2.0 this value will always be set 490 if (!resp.getMoreResultsInRegion()) { 491 completeWhenNoMoreResultsInRegion.run(); 492 return; 493 } 494 next(); 495 } 496 497 private void onComplete(HBaseRpcController controller, ScanResponse resp) { 498 if (controller.failed()) { 499 onError(controller.getFailed()); 500 return; 501 } 502 updateServerSideMetrics(scanMetrics, resp); 503 boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); 504 Result[] rawResults; 505 Result[] results; 506 int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows(); 507 try { 508 rawResults = ResponseConverter.getResults(controller.cellScanner(), resp); 509 updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage); 510 results = resultCache.addAndGet( 511 Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY), 512 isHeartbeatMessage); 513 } catch (IOException e) { 514 // We can not retry here. The server has responded normally and the call sequence has been 515 // increased so a new scan with the same call sequence will cause an 516 // OutOfOrderScannerNextException. Let the upper layer open a new scanner. 517 LOG.warn("decode scan response failed", e); 518 completeWhenError(true); 519 return; 520 } 521 522 ScanControllerImpl scanController; 523 if (results.length > 0) { 524 scanController = new ScanControllerImpl( 525 resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) : Optional.empty()); 526 updateNextStartRowWhenError(results[results.length - 1]); 527 consumer.onNext(results, scanController); 528 } else { 529 Optional<Cursor> cursor = Optional.empty(); 530 if (resp.hasCursor()) { 531 cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor())); 532 } else if (scan.isNeedCursorResult() && rawResults.length > 0) { 533 // It is size limit exceed and we need to return the last Result's row. 534 // When user setBatch and the scanner is reopened, the server may return Results that 535 // user has seen and the last Result can not be seen because the number is not enough. 536 // So the row keys of results may not be same, we must use the last one. 537 cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow())); 538 } 539 scanController = new ScanControllerImpl(cursor); 540 if (isHeartbeatMessage || cursor.isPresent()) { 541 // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we 542 // want to pass a cursor to upper layer. 543 consumer.onHeartbeat(scanController); 544 } 545 } 546 ScanControllerState state = scanController.destroy(); 547 if (state == ScanControllerState.TERMINATED) { 548 if (resp.getMoreResultsInRegion()) { 549 // we have more results in region but user request to stop the scan, so we need to close the 550 // scanner explicitly. 551 closeScanner(); 552 } 553 completeNoMoreResults(); 554 return; 555 } 556 int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 557 if (state == ScanControllerState.SUSPENDED) { 558 if (scanController.resumer.prepare(resp, numberOfCompleteRows)) { 559 return; 560 } 561 } 562 completeOrNext(resp, numberOfCompleteRows); 563 } 564 565 private void call() { 566 // As we have a call sequence for scan, it is useless to have a different rpc timeout which is 567 // less than the scan timeout. If the server does not respond in time(usually this will not 568 // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when 569 // resending the next request and the only way to fix this is to close the scanner and open a 570 // new one. 571 long callTimeoutNs; 572 if (scanTimeoutNs > 0) { 573 long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); 574 if (remainingNs <= 0) { 575 completeExceptionally(true); 576 return; 577 } 578 callTimeoutNs = remainingNs; 579 } else { 580 callTimeoutNs = 0L; 581 } 582 incRPCCallsMetrics(scanMetrics, regionServerRemote); 583 if (tries > 1) { 584 incRPCRetriesMetrics(scanMetrics, regionServerRemote); 585 } 586 resetController(controller, callTimeoutNs, priority); 587 ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, 588 nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit()); 589 final Context context = Context.current(); 590 stub.scan(controller, req, resp -> { 591 try (Scope ignored = context.makeCurrent()) { 592 onComplete(controller, resp); 593 } 594 }); 595 } 596 597 private void next() { 598 nextCallSeq++; 599 tries = 1; 600 exceptions.clear(); 601 nextCallStartNs = System.nanoTime(); 602 call(); 603 } 604 605 private void renewLease() { 606 incRPCCallsMetrics(scanMetrics, regionServerRemote); 607 nextCallSeq++; 608 resetController(controller, rpcTimeoutNs, priority); 609 ScanRequest req = 610 RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); 611 stub.scan(controller, req, resp -> { 612 }); 613 } 614 615 /** 616 * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also 617 * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the 618 * open scanner request is also needed because we may have some data in the CellScanner which is 619 * contained in the controller. 620 * @return {@code true} if we should continue, otherwise {@code false}. 621 */ 622 public CompletableFuture<Boolean> start(HBaseRpcController controller, 623 ScanResponse respWhenOpen) { 624 onComplete(controller, respWhenOpen); 625 return future; 626 } 627}