001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; 023 024import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 025 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.util.ArrayDeque; 029import java.util.Queue; 030import java.util.concurrent.ExecutorService; 031 032import org.apache.commons.lang3.mutable.MutableBoolean; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 035import org.apache.hadoop.hbase.DoNotRetryIOException; 036import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 037import org.apache.hadoop.hbase.exceptions.ScannerResetException; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HRegionInfo; 040import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 041import org.apache.hadoop.hbase.NotServingRegionException; 042import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.UnknownScannerException; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 051 052/** 053 * Implements the scanner interface for the HBase client. If there are multiple regions in a table, 054 * this scanner will iterate through them all. 055 */ 056@InterfaceAudience.Private 057public abstract class ClientScanner extends AbstractClientScanner { 058 059 private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class); 060 061 protected final Scan scan; 062 protected boolean closed = false; 063 // Current region scanner is against. Gets cleared if current region goes 064 // wonky: e.g. if it splits on us. 065 protected HRegionInfo currentRegion = null; 066 protected ScannerCallableWithReplicas callable = null; 067 protected Queue<Result> cache; 068 private final ScanResultCache scanResultCache; 069 protected final int caching; 070 protected long lastNext; 071 // Keep lastResult returned successfully in case we have to reset scanner. 072 protected Result lastResult = null; 073 protected final long maxScannerResultSize; 074 private final ClusterConnection connection; 075 protected final TableName tableName; 076 protected final int scannerTimeout; 077 protected boolean scanMetricsPublished = false; 078 protected RpcRetryingCaller<Result[]> caller; 079 protected RpcControllerFactory rpcControllerFactory; 080 protected Configuration conf; 081 // The timeout on the primary. Applicable if there are multiple replicas for a region 082 // In that case, we will only wait for this much timeout on the primary before going 083 // to the replicas and trying the same scan. Note that the retries will still happen 084 // on each replica and the first successful results will be taken. A timeout of 0 is 085 // disallowed. 086 protected final int primaryOperationTimeout; 087 private int retries; 088 protected final ExecutorService pool; 089 090 /** 091 * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start 092 * row maybe changed changed. 093 * @param conf The {@link Configuration} to use. 094 * @param scan {@link Scan} to use in this scanner 095 * @param tableName The table that we wish to scan 096 * @param connection Connection identifying the cluster 097 * @throws IOException 098 */ 099 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, 100 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, 101 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) 102 throws IOException { 103 if (LOG.isTraceEnabled()) { 104 LOG.trace( 105 "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); 106 } 107 this.scan = scan; 108 this.tableName = tableName; 109 this.lastNext = System.currentTimeMillis(); 110 this.connection = connection; 111 this.pool = pool; 112 this.primaryOperationTimeout = primaryOperationTimeout; 113 this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 114 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 115 if (scan.getMaxResultSize() > 0) { 116 this.maxScannerResultSize = scan.getMaxResultSize(); 117 } else { 118 this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 119 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 120 } 121 this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 122 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 123 124 // check if application wants to collect scan metrics 125 initScanMetrics(scan); 126 127 // Use the caching from the Scan. If not set, use the default cache setting for this table. 128 if (this.scan.getCaching() > 0) { 129 this.caching = this.scan.getCaching(); 130 } else { 131 this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 132 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); 133 } 134 135 this.caller = rpcFactory.<Result[]> newCaller(); 136 this.rpcControllerFactory = controllerFactory; 137 138 this.conf = conf; 139 140 this.scanResultCache = createScanResultCache(scan); 141 initCache(); 142 } 143 144 protected ClusterConnection getConnection() { 145 return this.connection; 146 } 147 148 protected TableName getTable() { 149 return this.tableName; 150 } 151 152 protected int getRetries() { 153 return this.retries; 154 } 155 156 protected int getScannerTimeout() { 157 return this.scannerTimeout; 158 } 159 160 protected Configuration getConf() { 161 return this.conf; 162 } 163 164 protected Scan getScan() { 165 return scan; 166 } 167 168 protected ExecutorService getPool() { 169 return pool; 170 } 171 172 protected int getPrimaryOperationTimeout() { 173 return primaryOperationTimeout; 174 } 175 176 protected int getCaching() { 177 return caching; 178 } 179 180 protected long getTimestamp() { 181 return lastNext; 182 } 183 184 @VisibleForTesting 185 protected long getMaxResultSize() { 186 return maxScannerResultSize; 187 } 188 189 private void closeScanner() throws IOException { 190 if (this.callable != null) { 191 this.callable.setClose(); 192 call(callable, caller, scannerTimeout, false); 193 this.callable = null; 194 } 195 } 196 197 /** 198 * Will be called in moveToNextRegion when currentRegion is null. Abstract because for normal 199 * scan, we will start next scan from the endKey of the currentRegion, and for reversed scan, we 200 * will start next scan from the startKey of the currentRegion. 201 * @return {@code false} if we have reached the stop row. Otherwise {@code true}. 202 */ 203 protected abstract boolean setNewStartKey(); 204 205 /** 206 * Will be called in moveToNextRegion to create ScannerCallable. Abstract because for reversed 207 * scan we need to create a ReversedScannerCallable. 208 */ 209 protected abstract ScannerCallable createScannerCallable(); 210 211 /** 212 * Close the previous scanner and create a new ScannerCallable for the next scanner. 213 * <p> 214 * Marked as protected only because TestClientScanner need to override this method. 215 * @return false if we should terminate the scan. Otherwise 216 */ 217 @VisibleForTesting 218 protected boolean moveToNextRegion() { 219 // Close the previous scanner if it's open 220 try { 221 closeScanner(); 222 } catch (IOException e) { 223 // not a big deal continue 224 if (LOG.isDebugEnabled()) { 225 LOG.debug("close scanner for " + currentRegion + " failed", e); 226 } 227 } 228 if (currentRegion != null) { 229 if (!setNewStartKey()) { 230 return false; 231 } 232 scan.resetMvccReadPoint(); 233 if (LOG.isTraceEnabled()) { 234 LOG.trace("Finished " + this.currentRegion); 235 } 236 } 237 if (LOG.isDebugEnabled() && this.currentRegion != null) { 238 // Only worth logging if NOT first region in scan. 239 LOG.debug( 240 "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow()) + 241 "', " + (scan.includeStartRow() ? "inclusive" : "exclusive")); 242 } 243 // clear the current region, we will set a new value to it after the first call of the new 244 // callable. 245 this.currentRegion = null; 246 this.callable = 247 new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, 248 primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller); 249 this.callable.setCaching(this.caching); 250 incRegionCountMetrics(scanMetrics); 251 return true; 252 } 253 254 @VisibleForTesting 255 boolean isAnyRPCcancelled() { 256 return callable.isAnyRPCcancelled(); 257 } 258 259 private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller, 260 int scannerTimeout, boolean updateCurrentRegion) throws IOException { 261 if (Thread.interrupted()) { 262 throw new InterruptedIOException(); 263 } 264 // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, 265 // we do a callWithRetries 266 Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout); 267 if (currentRegion == null && updateCurrentRegion) { 268 currentRegion = callable.getHRegionInfo(); 269 } 270 return rrs; 271 } 272 273 /** 274 * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the 275 * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics 276 * framework because it doesn't support multi-instances of the same metrics on the same machine; 277 * for scan/map reduce scenarios, we will have multiple scans running at the same time. By 278 * default, scan metrics are disabled; if the application wants to collect them, this behavior can 279 * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} 280 */ 281 protected void writeScanMetrics() { 282 if (this.scanMetrics == null || scanMetricsPublished) { 283 return; 284 } 285 // Publish ScanMetrics to the Scan Object. 286 // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not 287 // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published 288 // to Scan will be messed up. 289 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, 290 ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray()); 291 scanMetricsPublished = true; 292 } 293 294 protected void initSyncCache() { 295 cache = new ArrayDeque<>(); 296 } 297 298 protected Result nextWithSyncCache() throws IOException { 299 Result result = cache.poll(); 300 if (result != null) { 301 return result; 302 } 303 // If there is nothing left in the cache and the scanner is closed, 304 // return a no-op 305 if (this.closed) { 306 return null; 307 } 308 309 loadCache(); 310 311 // try again to load from cache 312 result = cache.poll(); 313 314 // if we exhausted this scanner before calling close, write out the scan metrics 315 if (result == null) { 316 writeScanMetrics(); 317 } 318 return result; 319 } 320 321 @VisibleForTesting 322 public int getCacheSize() { 323 return cache != null ? cache.size() : 0; 324 } 325 326 private boolean scanExhausted(Result[] values) { 327 return callable.moreResultsForScan() == MoreResults.NO; 328 } 329 330 private boolean regionExhausted(Result[] values) { 331 // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the 332 // old time we always return empty result for a open scanner operation so we add a check here to 333 // keep compatible with the old logic. Should remove the isOpenScanner in the future. 334 // 2. Server tells us that it has no more results for this region. 335 return (values.length == 0 && !callable.isHeartbeatMessage()) || 336 callable.moreResultsInRegion() == MoreResults.NO; 337 } 338 339 private void closeScannerIfExhausted(boolean exhausted) throws IOException { 340 if (exhausted) { 341 closeScanner(); 342 } 343 } 344 345 private void handleScanError(DoNotRetryIOException e, 346 MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException { 347 // An exception was thrown which makes any partial results that we were collecting 348 // invalid. The scanner will need to be reset to the beginning of a row. 349 scanResultCache.clear(); 350 351 // Unfortunately, DNRIOE is used in two different semantics. 352 // (1) The first is to close the client scanner and bubble up the exception all the way 353 // to the application. This is preferred when the exception is really un-recoverable 354 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 355 // bucket usually. 356 // (2) Second semantics is to close the current region scanner only, but continue the 357 // client scanner by overriding the exception. This is usually UnknownScannerException, 358 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 359 // application-level ClientScanner has to continue without bubbling up the exception to 360 // the client. See RSRpcServices to see how it throws DNRIOE's. 361 // See also: HBASE-16604, HBASE-17187 362 363 // If exception is any but the list below throw it back to the client; else setup 364 // the scanner and retry. 365 Throwable cause = e.getCause(); 366 if ((cause != null && cause instanceof NotServingRegionException) || 367 (cause != null && cause instanceof RegionServerStoppedException) || 368 e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException || 369 e instanceof ScannerResetException) { 370 // Pass. It is easier writing the if loop test as list of what is allowed rather than 371 // as a list of what is not allowed... so if in here, it means we do not throw. 372 if (retriesLeft <= 0) { 373 throw e; // no more retries 374 } 375 } else { 376 throw e; 377 } 378 379 // Else, its signal from depths of ScannerCallable that we need to reset the scanner. 380 if (this.lastResult != null) { 381 // The region has moved. We need to open a brand new scanner at the new location. 382 // Reset the startRow to the row we've seen last so that the new scanner starts at 383 // the correct row. Otherwise we may see previously returned rows again. 384 // If the lastRow is not partial, then we should start from the next row. As now we can 385 // exclude the start row, the logic here is the same for both normal scan and reversed scan. 386 // If lastResult is partial then include it, otherwise exclude it. 387 scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow()); 388 } 389 if (e instanceof OutOfOrderScannerNextException) { 390 if (retryAfterOutOfOrderException.isTrue()) { 391 retryAfterOutOfOrderException.setValue(false); 392 } else { 393 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? 394 throw new DoNotRetryIOException( 395 "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e); 396 } 397 } 398 // Clear region. 399 this.currentRegion = null; 400 // Set this to zero so we don't try and do an rpc and close on remote server when 401 // the exception we got was UnknownScanner or the Server is going down. 402 callable = null; 403 } 404 405 /** 406 * Contact the servers to load more {@link Result}s in the cache. 407 */ 408 protected void loadCache() throws IOException { 409 // check if scanner was closed during previous prefetch 410 if (closed) { 411 return; 412 } 413 long remainingResultSize = maxScannerResultSize; 414 int countdown = this.caching; 415 // This is possible if we just stopped at the boundary of a region in the previous call. 416 if (callable == null && !moveToNextRegion()) { 417 closed = true; 418 return; 419 } 420 // This flag is set when we want to skip the result returned. We do 421 // this when we reset scanner because it split under us. 422 MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true); 423 // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should 424 // make sure that we are not retrying indefinitely. 425 int retriesLeft = getRetries(); 426 for (;;) { 427 Result[] values; 428 try { 429 // Server returns a null values if scanning is to stop. Else, 430 // returns an empty array if scanning is to go on and we've just 431 // exhausted current region. 432 // now we will also fetch data when openScanner, so do not make a next call again if values 433 // is already non-null. 434 values = call(callable, caller, scannerTimeout, true); 435 // When the replica switch happens, we need to do certain operations again. 436 // The callable will openScanner with the right startkey but we need to pick up 437 // from there. Bypass the rest of the loop and let the catch-up happen in the beginning 438 // of the loop as it happens for the cases where we see exceptions. 439 if (callable.switchedToADifferentReplica()) { 440 // Any accumulated partial results are no longer valid since the callable will 441 // openScanner with the correct startkey and we must pick up from there 442 scanResultCache.clear(); 443 this.currentRegion = callable.getHRegionInfo(); 444 } 445 retryAfterOutOfOrderException.setValue(true); 446 } catch (DoNotRetryIOException e) { 447 handleScanError(e, retryAfterOutOfOrderException, retriesLeft--); 448 // reopen the scanner 449 if (!moveToNextRegion()) { 450 break; 451 } 452 continue; 453 } 454 long currentTime = System.currentTimeMillis(); 455 if (this.scanMetrics != null) { 456 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext); 457 } 458 lastNext = currentTime; 459 // Groom the array of Results that we received back from the server before adding that 460 // Results to the scanner's cache. If partial results are not allowed to be seen by the 461 // caller, all book keeping will be performed within this method. 462 int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows(); 463 Result[] resultsToAddToCache = 464 scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); 465 int numberOfCompleteRows = 466 scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 467 for (Result rs : resultsToAddToCache) { 468 cache.add(rs); 469 long estimatedHeapSizeOfResult = calcEstimatedSize(rs); 470 countdown--; 471 remainingResultSize -= estimatedHeapSizeOfResult; 472 addEstimatedSize(estimatedHeapSizeOfResult); 473 this.lastResult = rs; 474 } 475 476 if (scan.getLimit() > 0) { 477 int newLimit = scan.getLimit() - numberOfCompleteRows; 478 assert newLimit >= 0; 479 scan.setLimit(newLimit); 480 } 481 if (scan.getLimit() == 0 || scanExhausted(values)) { 482 closeScanner(); 483 closed = true; 484 break; 485 } 486 boolean regionExhausted = regionExhausted(values); 487 if (callable.isHeartbeatMessage()) { 488 if (!cache.isEmpty()) { 489 // Caller of this method just wants a Result. If we see a heartbeat message, it means 490 // processing of the scan is taking a long time server side. Rather than continue to 491 // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing 492 // unnecesary delays to the caller 493 LOG.trace("Heartbeat message received and cache contains Results. " + 494 "Breaking out of scan loop"); 495 // we know that the region has not been exhausted yet so just break without calling 496 // closeScannerIfExhausted 497 break; 498 } 499 } 500 if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) { 501 if (callable.isHeartbeatMessage() && callable.getCursor() != null) { 502 // Use cursor row key from server 503 cache.add(Result.createCursorResult(callable.getCursor())); 504 break; 505 } 506 if (values.length > 0) { 507 // It is size limit exceed and we need return the last Result's row. 508 // When user setBatch and the scanner is reopened, the server may return Results that 509 // user has seen and the last Result can not be seen because the number is not enough. 510 // So the row keys of results may not be same, we must use the last one. 511 cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow()))); 512 break; 513 } 514 } 515 if (countdown <= 0) { 516 // we have enough result. 517 closeScannerIfExhausted(regionExhausted); 518 break; 519 } 520 if (remainingResultSize <= 0) { 521 if (!cache.isEmpty()) { 522 closeScannerIfExhausted(regionExhausted); 523 break; 524 } else { 525 // we have reached the max result size but we still can not find anything to return to the 526 // user. Reset the maxResultSize and try again. 527 remainingResultSize = maxScannerResultSize; 528 } 529 } 530 // we are done with the current region 531 if (regionExhausted) { 532 if (!moveToNextRegion()) { 533 closed = true; 534 break; 535 } 536 } 537 } 538 } 539 540 protected void addEstimatedSize(long estimatedHeapSizeOfResult) { 541 return; 542 } 543 544 @VisibleForTesting 545 public int getCacheCount() { 546 return cache != null ? cache.size() : 0; 547 } 548 549 @Override 550 public void close() { 551 if (!scanMetricsPublished) writeScanMetrics(); 552 if (callable != null) { 553 callable.setClose(); 554 try { 555 call(callable, caller, scannerTimeout, false); 556 } catch (UnknownScannerException e) { 557 // We used to catch this error, interpret, and rethrow. However, we 558 // have since decided that it's not nice for a scanner's close to 559 // throw exceptions. Chances are it was just due to lease time out. 560 LOG.debug("scanner failed to close", e); 561 } catch (IOException e) { 562 /* An exception other than UnknownScanner is unexpected. */ 563 LOG.warn("scanner failed to close.", e); 564 } 565 callable = null; 566 } 567 closed = true; 568 } 569 570 @Override 571 public boolean renewLease() { 572 if (callable == null) { 573 return false; 574 } 575 // do not return any rows, do not advance the scanner 576 callable.setRenew(true); 577 try { 578 this.caller.callWithoutRetries(callable, this.scannerTimeout); 579 return true; 580 } catch (Exception e) { 581 LOG.debug("scanner failed to renew lease", e); 582 return false; 583 } finally { 584 callable.setRenew(false); 585 } 586 } 587 588 protected void initCache() { 589 initSyncCache(); 590 } 591 592 @Override 593 public Result next() throws IOException { 594 return nextWithSyncCache(); 595 } 596}