001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.util.ArrayDeque; 027import java.util.Queue; 028import java.util.concurrent.ExecutorService; 029import org.apache.commons.lang3.mutable.MutableBoolean; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.NotServingRegionException; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.UnknownScannerException; 037import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; 038import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; 039import org.apache.hadoop.hbase.exceptions.ScannerResetException; 040import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 041import org.apache.hadoop.hbase.regionserver.LeaseException; 042import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Implements the scanner interface for the HBase client. If there are multiple regions in a table, 051 * this scanner will iterate through them all. 052 */ 053@InterfaceAudience.Private 054public abstract class ClientScanner extends AbstractClientScanner { 055 056 private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class); 057 058 protected final Scan scan; 059 protected boolean closed = false; 060 // Current region scanner is against. Gets cleared if current region goes 061 // wonky: e.g. if it splits on us. 062 protected HRegionInfo currentRegion = null; 063 protected ScannerCallableWithReplicas callable = null; 064 protected Queue<Result> cache; 065 private final ScanResultCache scanResultCache; 066 protected final int caching; 067 protected long lastNext; 068 // Keep lastResult returned successfully in case we have to reset scanner. 069 protected Result lastResult = null; 070 protected final long maxScannerResultSize; 071 private final ClusterConnection connection; 072 protected final TableName tableName; 073 protected final int scannerTimeout; 074 protected boolean scanMetricsPublished = false; 075 protected RpcRetryingCaller<Result[]> caller; 076 protected RpcControllerFactory rpcControllerFactory; 077 protected Configuration conf; 078 // The timeout on the primary. Applicable if there are multiple replicas for a region 079 // In that case, we will only wait for this much timeout on the primary before going 080 // to the replicas and trying the same scan. Note that the retries will still happen 081 // on each replica and the first successful results will be taken. A timeout of 0 is 082 // disallowed. 083 protected final int primaryOperationTimeout; 084 private int retries; 085 protected final ExecutorService pool; 086 087 /** 088 * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start 089 * row maybe changed changed. 090 * @param conf The {@link Configuration} to use. 091 * @param scan {@link Scan} to use in this scanner 092 * @param tableName The table that we wish to scan 093 * @param connection Connection identifying the cluster 094 * @throws IOException 095 */ 096 public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, 097 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, 098 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) 099 throws IOException { 100 if (LOG.isTraceEnabled()) { 101 LOG.trace( 102 "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); 103 } 104 this.scan = scan; 105 this.tableName = tableName; 106 this.lastNext = System.currentTimeMillis(); 107 this.connection = connection; 108 this.pool = pool; 109 this.primaryOperationTimeout = primaryOperationTimeout; 110 this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 111 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 112 if (scan.getMaxResultSize() > 0) { 113 this.maxScannerResultSize = scan.getMaxResultSize(); 114 } else { 115 this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 116 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); 117 } 118 this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 119 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); 120 121 // check if application wants to collect scan metrics 122 initScanMetrics(scan); 123 124 // Use the caching from the Scan. If not set, use the default cache setting for this table. 125 if (this.scan.getCaching() > 0) { 126 this.caching = this.scan.getCaching(); 127 } else { 128 this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 129 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); 130 } 131 132 this.caller = rpcFactory.<Result[]> newCaller(); 133 this.rpcControllerFactory = controllerFactory; 134 135 this.conf = conf; 136 137 this.scanResultCache = createScanResultCache(scan); 138 initCache(); 139 } 140 141 protected final int getScanReplicaId() { 142 return scan.getReplicaId() >= RegionReplicaUtil.DEFAULT_REPLICA_ID ? scan.getReplicaId() : 143 RegionReplicaUtil.DEFAULT_REPLICA_ID; 144 } 145 146 protected ClusterConnection getConnection() { 147 return this.connection; 148 } 149 150 protected TableName getTable() { 151 return this.tableName; 152 } 153 154 protected int getRetries() { 155 return this.retries; 156 } 157 158 protected int getScannerTimeout() { 159 return this.scannerTimeout; 160 } 161 162 protected Configuration getConf() { 163 return this.conf; 164 } 165 166 protected Scan getScan() { 167 return scan; 168 } 169 170 protected ExecutorService getPool() { 171 return pool; 172 } 173 174 protected int getPrimaryOperationTimeout() { 175 return primaryOperationTimeout; 176 } 177 178 protected int getCaching() { 179 return caching; 180 } 181 182 protected long getTimestamp() { 183 return lastNext; 184 } 185 186 protected long getMaxResultSize() { 187 return maxScannerResultSize; 188 } 189 190 private void closeScanner() throws IOException { 191 if (this.callable != null) { 192 this.callable.setClose(); 193 call(callable, caller, scannerTimeout, false); 194 this.callable = null; 195 } 196 } 197 198 /** 199 * Will be called in moveToNextRegion when currentRegion is null. Abstract because for normal 200 * scan, we will start next scan from the endKey of the currentRegion, and for reversed scan, we 201 * will start next scan from the startKey of the currentRegion. 202 * @return {@code false} if we have reached the stop row. Otherwise {@code true}. 203 */ 204 protected abstract boolean setNewStartKey(); 205 206 /** 207 * Will be called in moveToNextRegion to create ScannerCallable. Abstract because for reversed 208 * scan we need to create a ReversedScannerCallable. 209 */ 210 protected abstract ScannerCallable createScannerCallable(); 211 212 /** 213 * Close the previous scanner and create a new ScannerCallable for the next scanner. 214 * <p> 215 * Marked as protected only because TestClientScanner need to override this method. 216 * @return false if we should terminate the scan. Otherwise 217 */ 218 protected boolean moveToNextRegion() { 219 // Close the previous scanner if it's open 220 try { 221 closeScanner(); 222 } catch (IOException e) { 223 // not a big deal continue 224 if (LOG.isDebugEnabled()) { 225 LOG.debug("close scanner for " + currentRegion + " failed", e); 226 } 227 } 228 if (currentRegion != null) { 229 if (!setNewStartKey()) { 230 return false; 231 } 232 scan.resetMvccReadPoint(); 233 if (LOG.isTraceEnabled()) { 234 LOG.trace("Finished " + this.currentRegion); 235 } 236 } 237 if (LOG.isDebugEnabled() && this.currentRegion != null) { 238 // Only worth logging if NOT first region in scan. 239 LOG.debug( 240 "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow()) + 241 "', " + (scan.includeStartRow() ? "inclusive" : "exclusive")); 242 } 243 // clear the current region, we will set a new value to it after the first call of the new 244 // callable. 245 this.currentRegion = null; 246 this.callable = 247 new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, 248 primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller); 249 this.callable.setCaching(this.caching); 250 incRegionCountMetrics(scanMetrics); 251 return true; 252 } 253 254 boolean isAnyRPCcancelled() { 255 return callable.isAnyRPCcancelled(); 256 } 257 258 private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller, 259 int scannerTimeout, boolean updateCurrentRegion) throws IOException { 260 if (Thread.interrupted()) { 261 throw new InterruptedIOException(); 262 } 263 // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, 264 // we do a callWithRetries 265 Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout); 266 if (currentRegion == null && updateCurrentRegion) { 267 currentRegion = callable.getHRegionInfo(); 268 } 269 return rrs; 270 } 271 272 /** 273 * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the 274 * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics 275 * framework because it doesn't support multi-instances of the same metrics on the same machine; 276 * for scan/map reduce scenarios, we will have multiple scans running at the same time. By 277 * default, scan metrics are disabled; if the application wants to collect them, this behavior can 278 * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} 279 */ 280 protected void writeScanMetrics() { 281 if (this.scanMetrics == null || scanMetricsPublished) { 282 return; 283 } 284 // Publish ScanMetrics to the Scan Object. 285 // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not 286 // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published 287 // to Scan will be messed up. 288 scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, 289 ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray()); 290 scanMetricsPublished = true; 291 } 292 293 protected void initSyncCache() { 294 cache = new ArrayDeque<>(); 295 } 296 297 protected Result nextWithSyncCache() throws IOException { 298 Result result = cache.poll(); 299 if (result != null) { 300 return result; 301 } 302 // If there is nothing left in the cache and the scanner is closed, 303 // return a no-op 304 if (this.closed) { 305 return null; 306 } 307 308 loadCache(); 309 310 // try again to load from cache 311 result = cache.poll(); 312 313 // if we exhausted this scanner before calling close, write out the scan metrics 314 if (result == null) { 315 writeScanMetrics(); 316 } 317 return result; 318 } 319 320 public int getCacheSize() { 321 return cache != null ? cache.size() : 0; 322 } 323 324 private boolean scanExhausted(Result[] values) { 325 return callable.moreResultsForScan() == MoreResults.NO; 326 } 327 328 private boolean regionExhausted(Result[] values) { 329 // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the 330 // old time we always return empty result for a open scanner operation so we add a check here to 331 // keep compatible with the old logic. Should remove the isOpenScanner in the future. 332 // 2. Server tells us that it has no more results for this region. 333 return (values.length == 0 && !callable.isHeartbeatMessage()) || 334 callable.moreResultsInRegion() == MoreResults.NO; 335 } 336 337 private void closeScannerIfExhausted(boolean exhausted) throws IOException { 338 if (exhausted) { 339 closeScanner(); 340 } 341 } 342 343 private void handleScanError(DoNotRetryIOException e, 344 MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException { 345 // An exception was thrown which makes any partial results that we were collecting 346 // invalid. The scanner will need to be reset to the beginning of a row. 347 scanResultCache.clear(); 348 349 // Unfortunately, DNRIOE is used in two different semantics. 350 // (1) The first is to close the client scanner and bubble up the exception all the way 351 // to the application. This is preferred when the exception is really un-recoverable 352 // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this 353 // bucket usually. 354 // (2) Second semantics is to close the current region scanner only, but continue the 355 // client scanner by overriding the exception. This is usually UnknownScannerException, 356 // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the 357 // application-level ClientScanner has to continue without bubbling up the exception to 358 // the client. See RSRpcServices to see how it throws DNRIOE's. 359 // See also: HBASE-16604, HBASE-17187 360 361 // If exception is any but the list below throw it back to the client; else setup 362 // the scanner and retry. 363 Throwable cause = e.getCause(); 364 if ((cause != null && cause instanceof NotServingRegionException) || 365 (cause != null && cause instanceof RegionServerStoppedException) || 366 e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException || 367 e instanceof ScannerResetException || e instanceof LeaseException) { 368 // Pass. It is easier writing the if loop test as list of what is allowed rather than 369 // as a list of what is not allowed... so if in here, it means we do not throw. 370 if (retriesLeft <= 0) { 371 throw e; // no more retries 372 } 373 } else { 374 throw e; 375 } 376 377 // Else, its signal from depths of ScannerCallable that we need to reset the scanner. 378 if (this.lastResult != null) { 379 // The region has moved. We need to open a brand new scanner at the new location. 380 // Reset the startRow to the row we've seen last so that the new scanner starts at 381 // the correct row. Otherwise we may see previously returned rows again. 382 // If the lastRow is not partial, then we should start from the next row. As now we can 383 // exclude the start row, the logic here is the same for both normal scan and reversed scan. 384 // If lastResult is partial then include it, otherwise exclude it. 385 scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow()); 386 } 387 if (e instanceof OutOfOrderScannerNextException) { 388 if (retryAfterOutOfOrderException.isTrue()) { 389 retryAfterOutOfOrderException.setValue(false); 390 } else { 391 // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? 392 throw new DoNotRetryIOException( 393 "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e); 394 } 395 } 396 // Clear region. 397 this.currentRegion = null; 398 // Set this to zero so we don't try and do an rpc and close on remote server when 399 // the exception we got was UnknownScanner or the Server is going down. 400 callable = null; 401 } 402 403 /** 404 * Contact the servers to load more {@link Result}s in the cache. 405 */ 406 protected void loadCache() throws IOException { 407 // check if scanner was closed during previous prefetch 408 if (closed) { 409 return; 410 } 411 long remainingResultSize = maxScannerResultSize; 412 int countdown = this.caching; 413 // This is possible if we just stopped at the boundary of a region in the previous call. 414 if (callable == null && !moveToNextRegion()) { 415 closed = true; 416 return; 417 } 418 // This flag is set when we want to skip the result returned. We do 419 // this when we reset scanner because it split under us. 420 MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true); 421 // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should 422 // make sure that we are not retrying indefinitely. 423 int retriesLeft = getRetries(); 424 for (;;) { 425 Result[] values; 426 try { 427 // Server returns a null values if scanning is to stop. Else, 428 // returns an empty array if scanning is to go on and we've just 429 // exhausted current region. 430 // now we will also fetch data when openScanner, so do not make a next call again if values 431 // is already non-null. 432 values = call(callable, caller, scannerTimeout, true); 433 // When the replica switch happens, we need to do certain operations again. 434 // The callable will openScanner with the right startkey but we need to pick up 435 // from there. Bypass the rest of the loop and let the catch-up happen in the beginning 436 // of the loop as it happens for the cases where we see exceptions. 437 if (callable.switchedToADifferentReplica()) { 438 // Any accumulated partial results are no longer valid since the callable will 439 // openScanner with the correct startkey and we must pick up from there 440 scanResultCache.clear(); 441 this.currentRegion = callable.getHRegionInfo(); 442 } 443 retryAfterOutOfOrderException.setValue(true); 444 } catch (DoNotRetryIOException e) { 445 handleScanError(e, retryAfterOutOfOrderException, retriesLeft--); 446 // reopen the scanner 447 if (!moveToNextRegion()) { 448 break; 449 } 450 continue; 451 } 452 long currentTime = System.currentTimeMillis(); 453 if (this.scanMetrics != null) { 454 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext); 455 } 456 lastNext = currentTime; 457 // Groom the array of Results that we received back from the server before adding that 458 // Results to the scanner's cache. If partial results are not allowed to be seen by the 459 // caller, all book keeping will be performed within this method. 460 int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows(); 461 Result[] resultsToAddToCache = 462 scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); 463 int numberOfCompleteRows = 464 scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; 465 for (Result rs : resultsToAddToCache) { 466 cache.add(rs); 467 long estimatedHeapSizeOfResult = calcEstimatedSize(rs); 468 countdown--; 469 remainingResultSize -= estimatedHeapSizeOfResult; 470 addEstimatedSize(estimatedHeapSizeOfResult); 471 this.lastResult = rs; 472 } 473 474 if (scan.getLimit() > 0) { 475 int newLimit = scan.getLimit() - numberOfCompleteRows; 476 assert newLimit >= 0; 477 scan.setLimit(newLimit); 478 } 479 if (scan.getLimit() == 0 || scanExhausted(values)) { 480 closeScanner(); 481 closed = true; 482 break; 483 } 484 boolean regionExhausted = regionExhausted(values); 485 if (callable.isHeartbeatMessage()) { 486 if (!cache.isEmpty()) { 487 // Caller of this method just wants a Result. If we see a heartbeat message, it means 488 // processing of the scan is taking a long time server side. Rather than continue to 489 // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing 490 // unnecesary delays to the caller 491 LOG.trace("Heartbeat message received and cache contains Results. " + 492 "Breaking out of scan loop"); 493 // we know that the region has not been exhausted yet so just break without calling 494 // closeScannerIfExhausted 495 break; 496 } 497 } 498 if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) { 499 if (callable.isHeartbeatMessage() && callable.getCursor() != null) { 500 // Use cursor row key from server 501 cache.add(Result.createCursorResult(callable.getCursor())); 502 break; 503 } 504 if (values.length > 0) { 505 // It is size limit exceed and we need return the last Result's row. 506 // When user setBatch and the scanner is reopened, the server may return Results that 507 // user has seen and the last Result can not be seen because the number is not enough. 508 // So the row keys of results may not be same, we must use the last one. 509 cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow()))); 510 break; 511 } 512 } 513 if (countdown <= 0) { 514 // we have enough result. 515 closeScannerIfExhausted(regionExhausted); 516 break; 517 } 518 if (remainingResultSize <= 0) { 519 if (!cache.isEmpty()) { 520 closeScannerIfExhausted(regionExhausted); 521 break; 522 } else { 523 // we have reached the max result size but we still can not find anything to return to the 524 // user. Reset the maxResultSize and try again. 525 remainingResultSize = maxScannerResultSize; 526 } 527 } 528 // we are done with the current region 529 if (regionExhausted) { 530 if (!moveToNextRegion()) { 531 closed = true; 532 break; 533 } 534 } 535 } 536 } 537 538 protected void addEstimatedSize(long estimatedHeapSizeOfResult) { 539 return; 540 } 541 542 public int getCacheCount() { 543 return cache != null ? cache.size() : 0; 544 } 545 546 @Override 547 public void close() { 548 if (!scanMetricsPublished) writeScanMetrics(); 549 if (callable != null) { 550 callable.setClose(); 551 try { 552 call(callable, caller, scannerTimeout, false); 553 } catch (UnknownScannerException e) { 554 // We used to catch this error, interpret, and rethrow. However, we 555 // have since decided that it's not nice for a scanner's close to 556 // throw exceptions. Chances are it was just due to lease time out. 557 LOG.debug("scanner failed to close", e); 558 } catch (IOException e) { 559 /* An exception other than UnknownScanner is unexpected. */ 560 LOG.warn("scanner failed to close.", e); 561 } 562 callable = null; 563 } 564 closed = true; 565 } 566 567 @Override 568 public boolean renewLease() { 569 if (callable == null) { 570 return false; 571 } 572 // do not return any rows, do not advance the scanner 573 callable.setRenew(true); 574 try { 575 this.caller.callWithoutRetries(callable, this.scannerTimeout); 576 return true; 577 } catch (Exception e) { 578 LOG.debug("scanner failed to renew lease", e); 579 return false; 580 } finally { 581 callable.setRenew(false); 582 } 583 } 584 585 protected void initCache() { 586 initSyncCache(); 587 } 588 589 @Override 590 public Result next() throws IOException { 591 return nextWithSyncCache(); 592 } 593}