001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; 023 024import io.opentelemetry.api.trace.Span; 025import io.opentelemetry.api.trace.StatusCode; 026import io.opentelemetry.context.Scope; 027import java.io.IOException; 028import java.io.InterruptedIOException; 029import java.util.ArrayDeque; 030import java.util.Queue; 031import java.util.concurrent.ExecutorService; 032import org.apache.commons.lang3.mutable.MutableBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.HRegionInfo; 037import org.apache.hadoop.hbase.NotServingRegionException; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.UnknownScannerException; 040import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 041import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 042import org.apache.hadoop.hbase.exceptions.ScannerResetException; 043import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 044import org.apache.hadoop.hbase.regionserver.LeaseException; 045import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053 054/** 055 * Implements the scanner interface for the HBase client. If there are multiple regions in a table, 056 * this scanner will iterate through them all. 057 */ 058@InterfaceAudience.Private 059public abstract class ClientScanner extends AbstractClientScanner { 060 061 private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class); 062 063 protected final Scan scan; 064 protected boolean closed = false; 065 // Current region scanner is against. Gets cleared if current region goes 066 // wonky: e.g. if it splits on us. 067 protected HRegionInfo currentRegion = null; 068 protected ScannerCallableWithReplicas callable = null; 069 protected Queue<Result> cache; 070 private final ScanResultCache scanResultCache; 071 protected final int caching; 072 protected long lastNext; 073 // Keep lastResult returned successfully in case we have to reset scanner. 074 protected Result lastResult = null; 075 protected final long maxScannerResultSize; 076 private final ClusterConnection connection; 077 protected final TableName tableName; 078 protected final int readRpcTimeout; 079 protected final int scannerTimeout; 080 protected boolean scanMetricsPublished = false; 081 protected RpcRetryingCaller<Result[]> caller; 082 protected RpcControllerFactory rpcControllerFactory; 083 protected Configuration conf; 084 protected final Span span; 085 // The timeout on the primary. Applicable if there are multiple replicas for a region 086 // In that case, we will only wait for this much timeout on the primary before going 087 // to the replicas and trying the same scan. Note that the retries will still happen 088 // on each replica and the first successful results will be taken. A timeout of 0 is 089 // disallowed. 090 protected final int primaryOperationTimeout; 091 private int retries; 092 protected final ExecutorService pool; 093 094 /** 095 * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start 096 * row maybe changed changed. 097 * @param conf The {@link Configuration} to use. 098 * @param scan {@link Scan} to use in this scanner 099 * @param tableName The table that we wish to scan 100 * @param connection Connection identifying the cluster 101 */ 102 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, 103 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, 104 RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, 105 int scannerTimeout, int primaryOperationTimeout) throws IOException { 106 if (LOG.isTraceEnabled()) { 107 LOG.trace( 108 "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); 109 } 110 this.scan = scan; 111 this.tableName = tableName; 112 this.lastNext = EnvironmentEdgeManager.currentTime(); 113 this.connection = connection; 114 this.pool = pool; 115 this.primaryOperationTimeout = primaryOperationTimeout; 116 this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 117 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 118 if (scan.getMaxResultSize() > 0) { 119 this.maxScannerResultSize = scan.getMaxResultSize(); 120 } else { 121 this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 122 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 123 } 124 this.readRpcTimeout = scanReadRpcTimeout; 125 this.scannerTimeout = scannerTimeout; 126 127 // check if application wants to collect scan metrics 128 initScanMetrics(scan); 129 130 // Use the caching from the Scan. If not set, use the default cache setting for this table. 131 if (this.scan.getCaching() > 0) { 132 this.caching = this.scan.getCaching(); 133 } else { 134 this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 135 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); 136 } 137 138 this.caller = rpcFactory.<Result[]> newCaller(); 139 this.rpcControllerFactory = controllerFactory; 140 141 this.conf = conf; 142 this.span = Span.current(); 143 144 this.scanResultCache = createScanResultCache(scan); 145 initCache(); 146 } 147 148 protected final int getScanReplicaId() { 149 return Math.max(scan.getReplicaId(), RegionReplicaUtil.DEFAULT_REPLICA_ID); 150 } 151 152 protected ClusterConnection getConnection() { 153 return this.connection; 154 } 155 156 protected TableName getTable() { 157 return this.tableName; 158 } 159 160 protected int getRetries() { 161 return this.retries; 162 } 163 164 protected int getScannerTimeout() { 165 return this.scannerTimeout; 166 } 167 168 protected Configuration getConf() { 169 return this.conf; 170 } 171 172 protected Scan getScan() { 173 return scan; 174 } 175 176 protected ExecutorService getPool() { 177 return pool; 178 } 179 180 protected int getPrimaryOperationTimeout() { 181 return primaryOperationTimeout; 182 } 183 184 protected int getCaching() { 185 return caching; 186 } 187 188 protected long getTimestamp() { 189 return lastNext; 190 } 191 192 protected long getMaxResultSize() { 193 return maxScannerResultSize; 194 } 195 196 private void closeScanner() throws IOException { 197 if (this.callable != null) { 198 this.callable.setClose(); 199 call(callable, caller, scannerTimeout, false); 200 this.callable = null; 201 } 202 } 203 204 /** 205 * Will be called in moveToNextRegion when currentRegion is null. Abstract because for normal 206 * scan, we will start next scan from the endKey of the currentRegion, and for reversed scan, we 207 * will start next scan from the startKey of the currentRegion. 208 * @return {@code false} if we have reached the stop row. Otherwise {@code true}. 209 */ 210 protected abstract boolean setNewStartKey(); 211 212 /** 213 * Will be called in moveToNextRegion to create ScannerCallable. Abstract because for reversed 214 * scan we need to create a ReversedScannerCallable. 215 */ 216 protected abstract ScannerCallable createScannerCallable(); 217 218 /** 219 * Close the previous scanner and create a new ScannerCallable for the next scanner. 220 * <p> 221 * Marked as protected only because TestClientScanner need to override this method. 222 * @return false if we should terminate the scan. Otherwise 223 */ 224 protected boolean moveToNextRegion() { 225 // Close the previous scanner if it's open 226 try { 227 closeScanner(); 228 } catch (IOException e) { 229 // not a big deal continue 230 if (LOG.isDebugEnabled()) { 231 LOG.debug("close scanner for " + currentRegion + " failed", e); 232 } 233 } 234 if (currentRegion != null) { 235 if (!setNewStartKey()) { 236 return false; 237 } 238 scan.resetMvccReadPoint(); 239 if (LOG.isTraceEnabled()) { 240 LOG.trace("Finished " + this.currentRegion); 241 } 242 } 243 if (LOG.isDebugEnabled() && this.currentRegion != null) { 244 // Only worth logging if NOT first region in scan. 245 LOG.debug( 246 "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow()) 247 + "', " + (scan.includeStartRow() ? "inclusive" : "exclusive")); 248 } 249 // clear the current region, we will set a new value to it after the first call of the new 250 // callable. 251 this.currentRegion = null; 252 this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(), 253 createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout, 254 scannerTimeout, caching, conf, caller); 255 this.callable.setCaching(this.caching); 256 incRegionCountMetrics(scanMetrics); 257 return true; 258 } 259 260 boolean isAnyRPCcancelled() { 261 return callable.isAnyRPCcancelled(); 262 } 263 264 private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller, 265 int scannerTimeout, boolean updateCurrentRegion) throws IOException { 266 if (Thread.interrupted()) { 267 throw new InterruptedIOException(); 268 } 269 // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, 270 // we do a callWithRetries 271 Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout); 272 if (currentRegion == null && updateCurrentRegion) { 273 currentRegion = callable.getHRegionInfo(); 274 } 275 return rrs; 276 } 277 278 /** 279 * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the 280 * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics 281 * framework because it doesn't support multi-instances of the same metrics on the same machine; 282 * for scan/map reduce scenarios, we will have multiple scans running at the same time. By 283 * default, scan metrics are disabled; if the application wants to collect them, this behavior can 284 * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} 285 */ 286 protected void writeScanMetrics() { 287 if (this.scanMetrics == null || scanMetricsPublished) { 288 return; 289 } 290 // Publish ScanMetrics to the Scan Object. 291 // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not 292 // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published 293 // to Scan will be messed up. 294 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, 295 ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray()); 296 scanMetricsPublished = true; 297 } 298 299 protected void initSyncCache() { 300 cache = new ArrayDeque<>(); 301 } 302 303 protected Result nextWithSyncCache() throws IOException { 304 Result result = cache.poll(); 305 if (result != null) { 306 return result; 307 } 308 // If there is nothing left in the cache and the scanner is closed, 309 // return a no-op 310 if (this.closed) { 311 return null; 312 } 313 314 loadCache(); 315 316 // try again to load from cache 317 result = cache.poll(); 318 319 // if we exhausted this scanner before calling close, write out the scan metrics 320 if (result == null) { 321 writeScanMetrics(); 322 } 323 return result; 324 } 325 326 public int getCacheSize() { 327 return cache != null ? cache.size() : 0; 328 } 329 330 private boolean scanExhausted() { 331 return callable.moreResultsForScan() == MoreResults.NO; 332 } 333 334 private boolean regionExhausted(Result[] values) { 335 // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the 336 // old time we always return empty result for a open scanner operation so we add a check here to 337 // keep compatible with the old logic. Should remove the isOpenScanner in the future. 338 // 2. Server tells us that it has no more results for this region. 339 return (values.length == 0 && !callable.isHeartbeatMessage()) 340 || callable.moreResultsInRegion() == MoreResults.NO; 341 } 342 343 private void closeScannerIfExhausted(boolean exhausted) throws IOException { 344 if (exhausted) { 345 closeScanner(); 346 } 347 } 348 349 private void handleScanError(DoNotRetryIOException e, 350 MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException { 351 // An exception was thrown which makes any partial results that we were collecting 352 // invalid. The scanner will need to be reset to the beginning of a row. 353 scanResultCache.clear(); 354 355 // Unfortunately, DNRIOE is used in two different semantics. 356 // (1) The first is to close the client scanner and bubble up the exception all the way 357 // to the application. This is preferred when the exception is really un-recoverable 358 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 359 // bucket usually. 360 // (2) Second semantics is to close the current region scanner only, but continue the 361 // client scanner by overriding the exception. This is usually UnknownScannerException, 362 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 363 // application-level ClientScanner has to continue without bubbling up the exception to 364 // the client. See RSRpcServices to see how it throws DNRIOE's. 365 // See also: HBASE-16604, HBASE-17187 366 367 // If exception is any but the list below throw it back to the client; else setup 368 // the scanner and retry. 369 Throwable cause = e.getCause(); 370 if ( 371 (cause != null && cause instanceof NotServingRegionException) 372 || (cause != null && cause instanceof RegionServerStoppedException) 373 || e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException 374 || e instanceof ScannerResetException || e instanceof LeaseException 375 ) { 376 // Pass. It is easier writing the if loop test as list of what is allowed rather than 377 // as a list of what is not allowed... so if in here, it means we do not throw. 378 if (retriesLeft <= 0) { 379 throw e; // no more retries 380 } 381 } else { 382 throw e; 383 } 384 385 // Else, its signal from depths of ScannerCallable that we need to reset the scanner. 386 if (this.lastResult != null) { 387 // The region has moved. We need to open a brand new scanner at the new location. 388 // Reset the startRow to the row we've seen last so that the new scanner starts at 389 // the correct row. Otherwise we may see previously returned rows again. 390 // If the lastRow is not partial, then we should start from the next row. As now we can 391 // exclude the start row, the logic here is the same for both normal scan and reversed scan. 392 // If lastResult is partial then include it, otherwise exclude it. 393 scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow()); 394 } 395 if (e instanceof OutOfOrderScannerNextException) { 396 if (retryAfterOutOfOrderException.isTrue()) { 397 retryAfterOutOfOrderException.setValue(false); 398 } else { 399 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? 400 throw new DoNotRetryIOException( 401 "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e); 402 } 403 } 404 // Clear region. 405 this.currentRegion = null; 406 // Set this to zero so we don't try and do an rpc and close on remote server when 407 // the exception we got was UnknownScanner or the Server is going down. 408 callable = null; 409 } 410 411 /** 412 * Contact the servers to load more {@link Result}s in the cache. 413 */ 414 protected void loadCache() throws IOException { 415 // check if scanner was closed during previous prefetch 416 if (closed) { 417 return; 418 } 419 long remainingResultSize = maxScannerResultSize; 420 int countdown = this.caching; 421 // This is possible if we just stopped at the boundary of a region in the previous call. 422 if (callable == null && !moveToNextRegion()) { 423 closed = true; 424 return; 425 } 426 // This flag is set when we want to skip the result returned. We do 427 // this when we reset scanner because it split under us. 428 MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true); 429 // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should 430 // make sure that we are not retrying indefinitely. 431 int retriesLeft = getRetries(); 432 for (;;) { 433 Result[] values; 434 try { 435 // Server returns a null values if scanning is to stop. Else, 436 // returns an empty array if scanning is to go on and we've just 437 // exhausted current region. 438 // now we will also fetch data when openScanner, so do not make a next call again if values 439 // is already non-null. 440 values = call(callable, caller, scannerTimeout, true); 441 // When the replica switch happens, we need to do certain operations again. 442 // The callable will openScanner with the right startkey but we need to pick up 443 // from there. Bypass the rest of the loop and let the catch-up happen in the beginning 444 // of the loop as it happens for the cases where we see exceptions. 445 if (callable.switchedToADifferentReplica()) { 446 // Any accumulated partial results are no longer valid since the callable will 447 // openScanner with the correct startkey and we must pick up from there 448 scanResultCache.clear(); 449 this.currentRegion = callable.getHRegionInfo(); 450 } 451 retryAfterOutOfOrderException.setValue(true); 452 } catch (DoNotRetryIOException e) { 453 handleScanError(e, retryAfterOutOfOrderException, retriesLeft--); 454 // reopen the scanner 455 if (!moveToNextRegion()) { 456 break; 457 } 458 continue; 459 } 460 long currentTime = EnvironmentEdgeManager.currentTime(); 461 if (this.scanMetrics != null) { 462 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext); 463 } 464 lastNext = currentTime; 465 // Groom the array of Results that we received back from the server before adding that 466 // Results to the scanner's cache. If partial results are not allowed to be seen by the 467 // caller, all book keeping will be performed within this method. 468 int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows(); 469 Result[] resultsToAddToCache = 470 scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); 471 int numberOfCompleteRows = 472 scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 473 for (Result rs : resultsToAddToCache) { 474 cache.add(rs); 475 long estimatedHeapSizeOfResult = calcEstimatedSize(rs); 476 countdown--; 477 remainingResultSize -= estimatedHeapSizeOfResult; 478 addEstimatedSize(estimatedHeapSizeOfResult); 479 this.lastResult = rs; 480 } 481 482 if (scan.getLimit() > 0) { 483 int newLimit = scan.getLimit() - numberOfCompleteRows; 484 assert newLimit >= 0; 485 scan.setLimit(newLimit); 486 } 487 if (scan.getLimit() == 0 || scanExhausted()) { 488 closeScanner(); 489 closed = true; 490 break; 491 } 492 boolean regionExhausted = regionExhausted(values); 493 if (callable.isHeartbeatMessage()) { 494 if (!cache.isEmpty()) { 495 // Caller of this method just wants a Result. If we see a heartbeat message, it means 496 // processing of the scan is taking a long time server side. Rather than continue to 497 // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing 498 // unnecesary delays to the caller 499 LOG.trace("Heartbeat message received and cache contains Results. " 500 + "Breaking out of scan loop"); 501 // we know that the region has not been exhausted yet so just break without calling 502 // closeScannerIfExhausted 503 break; 504 } 505 } 506 if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) { 507 if (callable.isHeartbeatMessage() && callable.getCursor() != null) { 508 // Use cursor row key from server 509 cache.add(Result.createCursorResult(callable.getCursor())); 510 break; 511 } 512 if (values.length > 0) { 513 // It is size limit exceed and we need return the last Result's row. 514 // When user setBatch and the scanner is reopened, the server may return Results that 515 // user has seen and the last Result can not be seen because the number is not enough. 516 // So the row keys of results may not be same, we must use the last one. 517 cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow()))); 518 break; 519 } 520 } 521 if (countdown <= 0) { 522 // we have enough result. 523 closeScannerIfExhausted(regionExhausted); 524 break; 525 } 526 if (remainingResultSize <= 0) { 527 if (!cache.isEmpty()) { 528 closeScannerIfExhausted(regionExhausted); 529 break; 530 } else { 531 // we have reached the max result size but we still can not find anything to return to the 532 // user. Reset the maxResultSize and try again. 533 remainingResultSize = maxScannerResultSize; 534 } 535 } 536 // we are done with the current region 537 if (regionExhausted) { 538 if (!moveToNextRegion()) { 539 closed = true; 540 break; 541 } 542 } 543 } 544 } 545 546 protected void addEstimatedSize(long estimatedHeapSizeOfResult) { 547 return; 548 } 549 550 public int getCacheCount() { 551 return cache != null ? cache.size() : 0; 552 } 553 554 @Override 555 public void close() { 556 try (Scope ignored = span.makeCurrent()) { 557 if (!scanMetricsPublished) { 558 writeScanMetrics(); 559 } 560 if (callable != null) { 561 callable.setClose(); 562 try { 563 call(callable, caller, scannerTimeout, false); 564 } catch (UnknownScannerException e) { 565 // We used to catch this error, interpret, and rethrow. However, we 566 // have since decided that it's not nice for a scanner's close to 567 // throw exceptions. Chances are it was just due to lease time out. 568 LOG.debug("scanner failed to close", e); 569 } catch (IOException e) { 570 /* An exception other than UnknownScanner is unexpected. */ 571 LOG.warn("scanner failed to close.", e); 572 span.recordException(e); 573 span.setStatus(StatusCode.ERROR); 574 } 575 callable = null; 576 } 577 closed = true; 578 span.setStatus(StatusCode.OK); 579 } finally { 580 span.end(); 581 } 582 } 583 584 @Override 585 public boolean renewLease() { 586 try (Scope ignored = span.makeCurrent()) { 587 if (callable == null) { 588 return false; 589 } 590 // do not return any rows, do not advance the scanner 591 callable.setRenew(true); 592 try { 593 this.caller.callWithoutRetries(callable, this.scannerTimeout); 594 return true; 595 } catch (Exception e) { 596 LOG.debug("scanner failed to renew lease", e); 597 span.recordException(e); 598 return false; 599 } finally { 600 callable.setRenew(false); 601 } 602 } 603 } 604 605 protected void initCache() { 606 initSyncCache(); 607 } 608 609 @Override 610 public Result next() throws IOException { 611 try (Scope ignored = span.makeCurrent()) { 612 return nextWithSyncCache(); 613 } 614 } 615}