001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; 023 024import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 025 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.util.ArrayDeque; 029import java.util.Queue; 030import java.util.concurrent.ExecutorService; 031 032import org.apache.commons.lang3.mutable.MutableBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 035import org.apache.hadoop.hbase.DoNotRetryIOException; 036import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 037import org.apache.hadoop.hbase.exceptions.ScannerResetException; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HRegionInfo; 040import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 041import org.apache.hadoop.hbase.regionserver.LeaseException; 042import org.apache.hadoop.hbase.NotServingRegionException; 043import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.UnknownScannerException; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 052 053/** 054 * Implements the scanner interface for the HBase client. If there are multiple regions in a table, 055 * this scanner will iterate through them all. 056 */ 057@InterfaceAudience.Private 058public abstract class ClientScanner extends AbstractClientScanner { 059 060 private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class); 061 062 protected final Scan scan; 063 protected boolean closed = false; 064 // Current region scanner is against. Gets cleared if current region goes 065 // wonky: e.g. if it splits on us. 066 protected HRegionInfo currentRegion = null; 067 protected ScannerCallableWithReplicas callable = null; 068 protected Queue<Result> cache; 069 private final ScanResultCache scanResultCache; 070 protected final int caching; 071 protected long lastNext; 072 // Keep lastResult returned successfully in case we have to reset scanner. 073 protected Result lastResult = null; 074 protected final long maxScannerResultSize; 075 private final ClusterConnection connection; 076 protected final TableName tableName; 077 protected final int scannerTimeout; 078 protected boolean scanMetricsPublished = false; 079 protected RpcRetryingCaller<Result[]> caller; 080 protected RpcControllerFactory rpcControllerFactory; 081 protected Configuration conf; 082 // The timeout on the primary. Applicable if there are multiple replicas for a region 083 // In that case, we will only wait for this much timeout on the primary before going 084 // to the replicas and trying the same scan. Note that the retries will still happen 085 // on each replica and the first successful results will be taken. A timeout of 0 is 086 // disallowed. 087 protected final int primaryOperationTimeout; 088 private int retries; 089 protected final ExecutorService pool; 090 091 /** 092 * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start 093 * row maybe changed changed. 094 * @param conf The {@link Configuration} to use. 095 * @param scan {@link Scan} to use in this scanner 096 * @param tableName The table that we wish to scan 097 * @param connection Connection identifying the cluster 098 * @throws IOException 099 */ 100 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, 101 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, 102 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) 103 throws IOException { 104 if (LOG.isTraceEnabled()) { 105 LOG.trace( 106 "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); 107 } 108 this.scan = scan; 109 this.tableName = tableName; 110 this.lastNext = System.currentTimeMillis(); 111 this.connection = connection; 112 this.pool = pool; 113 this.primaryOperationTimeout = primaryOperationTimeout; 114 this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 115 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 116 if (scan.getMaxResultSize() > 0) { 117 this.maxScannerResultSize = scan.getMaxResultSize(); 118 } else { 119 this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 120 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 121 } 122 this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 123 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 124 125 // check if application wants to collect scan metrics 126 initScanMetrics(scan); 127 128 // Use the caching from the Scan. If not set, use the default cache setting for this table. 129 if (this.scan.getCaching() > 0) { 130 this.caching = this.scan.getCaching(); 131 } else { 132 this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 133 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); 134 } 135 136 this.caller = rpcFactory.<Result[]> newCaller(); 137 this.rpcControllerFactory = controllerFactory; 138 139 this.conf = conf; 140 141 this.scanResultCache = createScanResultCache(scan); 142 initCache(); 143 } 144 145 protected ClusterConnection getConnection() { 146 return this.connection; 147 } 148 149 protected TableName getTable() { 150 return this.tableName; 151 } 152 153 protected int getRetries() { 154 return this.retries; 155 } 156 157 protected int getScannerTimeout() { 158 return this.scannerTimeout; 159 } 160 161 protected Configuration getConf() { 162 return this.conf; 163 } 164 165 protected Scan getScan() { 166 return scan; 167 } 168 169 protected ExecutorService getPool() { 170 return pool; 171 } 172 173 protected int getPrimaryOperationTimeout() { 174 return primaryOperationTimeout; 175 } 176 177 protected int getCaching() { 178 return caching; 179 } 180 181 protected long getTimestamp() { 182 return lastNext; 183 } 184 185 @VisibleForTesting 186 protected long getMaxResultSize() { 187 return maxScannerResultSize; 188 } 189 190 private void closeScanner() throws IOException { 191 if (this.callable != null) { 192 this.callable.setClose(); 193 call(callable, caller, scannerTimeout, false); 194 this.callable = null; 195 } 196 } 197 198 /** 199 * Will be called in moveToNextRegion when currentRegion is null. Abstract because for normal 200 * scan, we will start next scan from the endKey of the currentRegion, and for reversed scan, we 201 * will start next scan from the startKey of the currentRegion. 202 * @return {@code false} if we have reached the stop row. Otherwise {@code true}. 203 */ 204 protected abstract boolean setNewStartKey(); 205 206 /** 207 * Will be called in moveToNextRegion to create ScannerCallable. Abstract because for reversed 208 * scan we need to create a ReversedScannerCallable. 209 */ 210 protected abstract ScannerCallable createScannerCallable(); 211 212 /** 213 * Close the previous scanner and create a new ScannerCallable for the next scanner. 214 * <p> 215 * Marked as protected only because TestClientScanner need to override this method. 216 * @return false if we should terminate the scan. Otherwise 217 */ 218 @VisibleForTesting 219 protected boolean moveToNextRegion() { 220 // Close the previous scanner if it's open 221 try { 222 closeScanner(); 223 } catch (IOException e) { 224 // not a big deal continue 225 if (LOG.isDebugEnabled()) { 226 LOG.debug("close scanner for " + currentRegion + " failed", e); 227 } 228 } 229 if (currentRegion != null) { 230 if (!setNewStartKey()) { 231 return false; 232 } 233 scan.resetMvccReadPoint(); 234 if (LOG.isTraceEnabled()) { 235 LOG.trace("Finished " + this.currentRegion); 236 } 237 } 238 if (LOG.isDebugEnabled() && this.currentRegion != null) { 239 // Only worth logging if NOT first region in scan. 240 LOG.debug( 241 "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow()) + 242 "', " + (scan.includeStartRow() ? "inclusive" : "exclusive")); 243 } 244 // clear the current region, we will set a new value to it after the first call of the new 245 // callable. 246 this.currentRegion = null; 247 this.callable = 248 new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, 249 primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller); 250 this.callable.setCaching(this.caching); 251 incRegionCountMetrics(scanMetrics); 252 return true; 253 } 254 255 @VisibleForTesting 256 boolean isAnyRPCcancelled() { 257 return callable.isAnyRPCcancelled(); 258 } 259 260 private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller, 261 int scannerTimeout, boolean updateCurrentRegion) throws IOException { 262 if (Thread.interrupted()) { 263 throw new InterruptedIOException(); 264 } 265 // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, 266 // we do a callWithRetries 267 Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout); 268 if (currentRegion == null && updateCurrentRegion) { 269 currentRegion = callable.getHRegionInfo(); 270 } 271 return rrs; 272 } 273 274 /** 275 * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the 276 * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics 277 * framework because it doesn't support multi-instances of the same metrics on the same machine; 278 * for scan/map reduce scenarios, we will have multiple scans running at the same time. By 279 * default, scan metrics are disabled; if the application wants to collect them, this behavior can 280 * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} 281 */ 282 protected void writeScanMetrics() { 283 if (this.scanMetrics == null || scanMetricsPublished) { 284 return; 285 } 286 // Publish ScanMetrics to the Scan Object. 287 // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not 288 // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published 289 // to Scan will be messed up. 290 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, 291 ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray()); 292 scanMetricsPublished = true; 293 } 294 295 protected void initSyncCache() { 296 cache = new ArrayDeque<>(); 297 } 298 299 protected Result nextWithSyncCache() throws IOException { 300 Result result = cache.poll(); 301 if (result != null) { 302 return result; 303 } 304 // If there is nothing left in the cache and the scanner is closed, 305 // return a no-op 306 if (this.closed) { 307 return null; 308 } 309 310 loadCache(); 311 312 // try again to load from cache 313 result = cache.poll(); 314 315 // if we exhausted this scanner before calling close, write out the scan metrics 316 if (result == null) { 317 writeScanMetrics(); 318 } 319 return result; 320 } 321 322 @VisibleForTesting 323 public int getCacheSize() { 324 return cache != null ? cache.size() : 0; 325 } 326 327 private boolean scanExhausted(Result[] values) { 328 return callable.moreResultsForScan() == MoreResults.NO; 329 } 330 331 private boolean regionExhausted(Result[] values) { 332 // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the 333 // old time we always return empty result for a open scanner operation so we add a check here to 334 // keep compatible with the old logic. Should remove the isOpenScanner in the future. 335 // 2. Server tells us that it has no more results for this region. 336 return (values.length == 0 && !callable.isHeartbeatMessage()) || 337 callable.moreResultsInRegion() == MoreResults.NO; 338 } 339 340 private void closeScannerIfExhausted(boolean exhausted) throws IOException { 341 if (exhausted) { 342 closeScanner(); 343 } 344 } 345 346 private void handleScanError(DoNotRetryIOException e, 347 MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException { 348 // An exception was thrown which makes any partial results that we were collecting 349 // invalid. The scanner will need to be reset to the beginning of a row. 350 scanResultCache.clear(); 351 352 // Unfortunately, DNRIOE is used in two different semantics. 353 // (1) The first is to close the client scanner and bubble up the exception all the way 354 // to the application. This is preferred when the exception is really un-recoverable 355 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 356 // bucket usually. 357 // (2) Second semantics is to close the current region scanner only, but continue the 358 // client scanner by overriding the exception. This is usually UnknownScannerException, 359 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 360 // application-level ClientScanner has to continue without bubbling up the exception to 361 // the client. See RSRpcServices to see how it throws DNRIOE's. 362 // See also: HBASE-16604, HBASE-17187 363 364 // If exception is any but the list below throw it back to the client; else setup 365 // the scanner and retry. 366 Throwable cause = e.getCause(); 367 if ((cause != null && cause instanceof NotServingRegionException) || 368 (cause != null && cause instanceof RegionServerStoppedException) || 369 e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException || 370 e instanceof ScannerResetException || e instanceof LeaseException) { 371 // Pass. It is easier writing the if loop test as list of what is allowed rather than 372 // as a list of what is not allowed... so if in here, it means we do not throw. 373 if (retriesLeft <= 0) { 374 throw e; // no more retries 375 } 376 } else { 377 throw e; 378 } 379 380 // Else, its signal from depths of ScannerCallable that we need to reset the scanner. 381 if (this.lastResult != null) { 382 // The region has moved. We need to open a brand new scanner at the new location. 383 // Reset the startRow to the row we've seen last so that the new scanner starts at 384 // the correct row. Otherwise we may see previously returned rows again. 385 // If the lastRow is not partial, then we should start from the next row. As now we can 386 // exclude the start row, the logic here is the same for both normal scan and reversed scan. 387 // If lastResult is partial then include it, otherwise exclude it. 388 scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow()); 389 } 390 if (e instanceof OutOfOrderScannerNextException) { 391 if (retryAfterOutOfOrderException.isTrue()) { 392 retryAfterOutOfOrderException.setValue(false); 393 } else { 394 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? 395 throw new DoNotRetryIOException( 396 "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e); 397 } 398 } 399 // Clear region. 400 this.currentRegion = null; 401 // Set this to zero so we don't try and do an rpc and close on remote server when 402 // the exception we got was UnknownScanner or the Server is going down. 403 callable = null; 404 } 405 406 /** 407 * Contact the servers to load more {@link Result}s in the cache. 408 */ 409 protected void loadCache() throws IOException { 410 // check if scanner was closed during previous prefetch 411 if (closed) { 412 return; 413 } 414 long remainingResultSize = maxScannerResultSize; 415 int countdown = this.caching; 416 // This is possible if we just stopped at the boundary of a region in the previous call. 417 if (callable == null && !moveToNextRegion()) { 418 closed = true; 419 return; 420 } 421 // This flag is set when we want to skip the result returned. We do 422 // this when we reset scanner because it split under us. 423 MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true); 424 // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should 425 // make sure that we are not retrying indefinitely. 426 int retriesLeft = getRetries(); 427 for (;;) { 428 Result[] values; 429 try { 430 // Server returns a null values if scanning is to stop. Else, 431 // returns an empty array if scanning is to go on and we've just 432 // exhausted current region. 433 // now we will also fetch data when openScanner, so do not make a next call again if values 434 // is already non-null. 435 values = call(callable, caller, scannerTimeout, true); 436 // When the replica switch happens, we need to do certain operations again. 437 // The callable will openScanner with the right startkey but we need to pick up 438 // from there. Bypass the rest of the loop and let the catch-up happen in the beginning 439 // of the loop as it happens for the cases where we see exceptions. 440 if (callable.switchedToADifferentReplica()) { 441 // Any accumulated partial results are no longer valid since the callable will 442 // openScanner with the correct startkey and we must pick up from there 443 scanResultCache.clear(); 444 this.currentRegion = callable.getHRegionInfo(); 445 } 446 retryAfterOutOfOrderException.setValue(true); 447 } catch (DoNotRetryIOException e) { 448 handleScanError(e, retryAfterOutOfOrderException, retriesLeft--); 449 // reopen the scanner 450 if (!moveToNextRegion()) { 451 break; 452 } 453 continue; 454 } 455 long currentTime = System.currentTimeMillis(); 456 if (this.scanMetrics != null) { 457 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext); 458 } 459 lastNext = currentTime; 460 // Groom the array of Results that we received back from the server before adding that 461 // Results to the scanner's cache. If partial results are not allowed to be seen by the 462 // caller, all book keeping will be performed within this method. 463 int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows(); 464 Result[] resultsToAddToCache = 465 scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); 466 int numberOfCompleteRows = 467 scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 468 for (Result rs : resultsToAddToCache) { 469 cache.add(rs); 470 long estimatedHeapSizeOfResult = calcEstimatedSize(rs); 471 countdown--; 472 remainingResultSize -= estimatedHeapSizeOfResult; 473 addEstimatedSize(estimatedHeapSizeOfResult); 474 this.lastResult = rs; 475 } 476 477 if (scan.getLimit() > 0) { 478 int newLimit = scan.getLimit() - numberOfCompleteRows; 479 assert newLimit >= 0; 480 scan.setLimit(newLimit); 481 } 482 if (scan.getLimit() == 0 || scanExhausted(values)) { 483 closeScanner(); 484 closed = true; 485 break; 486 } 487 boolean regionExhausted = regionExhausted(values); 488 if (callable.isHeartbeatMessage()) { 489 if (!cache.isEmpty()) { 490 // Caller of this method just wants a Result. If we see a heartbeat message, it means 491 // processing of the scan is taking a long time server side. Rather than continue to 492 // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing 493 // unnecesary delays to the caller 494 LOG.trace("Heartbeat message received and cache contains Results. " + 495 "Breaking out of scan loop"); 496 // we know that the region has not been exhausted yet so just break without calling 497 // closeScannerIfExhausted 498 break; 499 } 500 } 501 if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) { 502 if (callable.isHeartbeatMessage() && callable.getCursor() != null) { 503 // Use cursor row key from server 504 cache.add(Result.createCursorResult(callable.getCursor())); 505 break; 506 } 507 if (values.length > 0) { 508 // It is size limit exceed and we need return the last Result's row. 509 // When user setBatch and the scanner is reopened, the server may return Results that 510 // user has seen and the last Result can not be seen because the number is not enough. 511 // So the row keys of results may not be same, we must use the last one. 512 cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow()))); 513 break; 514 } 515 } 516 if (countdown <= 0) { 517 // we have enough result. 518 closeScannerIfExhausted(regionExhausted); 519 break; 520 } 521 if (remainingResultSize <= 0) { 522 if (!cache.isEmpty()) { 523 closeScannerIfExhausted(regionExhausted); 524 break; 525 } else { 526 // we have reached the max result size but we still can not find anything to return to the 527 // user. Reset the maxResultSize and try again. 528 remainingResultSize = maxScannerResultSize; 529 } 530 } 531 // we are done with the current region 532 if (regionExhausted) { 533 if (!moveToNextRegion()) { 534 closed = true; 535 break; 536 } 537 } 538 } 539 } 540 541 protected void addEstimatedSize(long estimatedHeapSizeOfResult) { 542 return; 543 } 544 545 @VisibleForTesting 546 public int getCacheCount() { 547 return cache != null ? cache.size() : 0; 548 } 549 550 @Override 551 public void close() { 552 if (!scanMetricsPublished) writeScanMetrics(); 553 if (callable != null) { 554 callable.setClose(); 555 try { 556 call(callable, caller, scannerTimeout, false); 557 } catch (UnknownScannerException e) { 558 // We used to catch this error, interpret, and rethrow. However, we 559 // have since decided that it's not nice for a scanner's close to 560 // throw exceptions. Chances are it was just due to lease time out. 561 LOG.debug("scanner failed to close", e); 562 } catch (IOException e) { 563 /* An exception other than UnknownScanner is unexpected. */ 564 LOG.warn("scanner failed to close.", e); 565 } 566 callable = null; 567 } 568 closed = true; 569 } 570 571 @Override 572 public boolean renewLease() { 573 if (callable == null) { 574 return false; 575 } 576 // do not return any rows, do not advance the scanner 577 callable.setRenew(true); 578 try { 579 this.caller.callWithoutRetries(callable, this.scannerTimeout); 580 return true; 581 } catch (Exception e) { 582 LOG.debug("scanner failed to renew lease", e); 583 return false; 584 } finally { 585 callable.setRenew(false); 586 } 587 } 588 589 protected void initCache() { 590 initSyncCache(); 591 } 592 593 @Override 594 public Result next() throws IOException { 595 return nextWithSyncCache(); 596 } 597}