001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayDeque;
027import java.util.Queue;
028import java.util.concurrent.ExecutorService;
029import org.apache.commons.lang3.mutable.MutableBoolean;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.DoNotRetryIOException;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionInfo;
034import org.apache.hadoop.hbase.NotServingRegionException;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.UnknownScannerException;
037import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
038import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
039import org.apache.hadoop.hbase.exceptions.ScannerResetException;
040import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
041import org.apache.hadoop.hbase.regionserver.LeaseException;
042import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Implements the scanner interface for the HBase client. If there are multiple regions in a table,
051 * this scanner will iterate through them all.
052 */
053@InterfaceAudience.Private
054public abstract class ClientScanner extends AbstractClientScanner {
055
056  private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class);
057
058  protected final Scan scan;
059  protected boolean closed = false;
060  // Current region scanner is against. Gets cleared if current region goes
061  // wonky: e.g. if it splits on us.
062  protected HRegionInfo currentRegion = null;
063  protected ScannerCallableWithReplicas callable = null;
064  protected Queue<Result> cache;
065  private final ScanResultCache scanResultCache;
066  protected final int caching;
067  protected long lastNext;
068  // Keep lastResult returned successfully in case we have to reset scanner.
069  protected Result lastResult = null;
070  protected final long maxScannerResultSize;
071  private final ClusterConnection connection;
072  protected final TableName tableName;
073  protected final int scannerTimeout;
074  protected boolean scanMetricsPublished = false;
075  protected RpcRetryingCaller<Result[]> caller;
076  protected RpcControllerFactory rpcControllerFactory;
077  protected Configuration conf;
078  // The timeout on the primary. Applicable if there are multiple replicas for a region
079  // In that case, we will only wait for this much timeout on the primary before going
080  // to the replicas and trying the same scan. Note that the retries will still happen
081  // on each replica and the first successful results will be taken. A timeout of 0 is
082  // disallowed.
083  protected final int primaryOperationTimeout;
084  private int retries;
085  protected final ExecutorService pool;
086
087  /**
088   * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
089   * row maybe changed changed.
090   * @param conf The {@link Configuration} to use.
091   * @param scan {@link Scan} to use in this scanner
092   * @param tableName The table that we wish to scan
093   * @param connection Connection identifying the cluster
094   * @throws IOException
095   */
096  public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
097      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
098      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
099      throws IOException {
100    if (LOG.isTraceEnabled()) {
101      LOG.trace(
102        "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
103    }
104    this.scan = scan;
105    this.tableName = tableName;
106    this.lastNext = System.currentTimeMillis();
107    this.connection = connection;
108    this.pool = pool;
109    this.primaryOperationTimeout = primaryOperationTimeout;
110    this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
111      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
112    if (scan.getMaxResultSize() > 0) {
113      this.maxScannerResultSize = scan.getMaxResultSize();
114    } else {
115      this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
116        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
117    }
118    this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
119        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
120
121    // check if application wants to collect scan metrics
122    initScanMetrics(scan);
123
124    // Use the caching from the Scan. If not set, use the default cache setting for this table.
125    if (this.scan.getCaching() > 0) {
126      this.caching = this.scan.getCaching();
127    } else {
128      this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
129        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
130    }
131
132    this.caller = rpcFactory.<Result[]> newCaller();
133    this.rpcControllerFactory = controllerFactory;
134
135    this.conf = conf;
136
137    this.scanResultCache = createScanResultCache(scan);
138    initCache();
139  }
140
141  protected final int getScanReplicaId() {
142    return scan.getReplicaId() >= RegionReplicaUtil.DEFAULT_REPLICA_ID ? scan.getReplicaId() :
143      RegionReplicaUtil.DEFAULT_REPLICA_ID;
144  }
145
146  protected ClusterConnection getConnection() {
147    return this.connection;
148  }
149
150  protected TableName getTable() {
151    return this.tableName;
152  }
153
154  protected int getRetries() {
155    return this.retries;
156  }
157
158  protected int getScannerTimeout() {
159    return this.scannerTimeout;
160  }
161
162  protected Configuration getConf() {
163    return this.conf;
164  }
165
166  protected Scan getScan() {
167    return scan;
168  }
169
170  protected ExecutorService getPool() {
171    return pool;
172  }
173
174  protected int getPrimaryOperationTimeout() {
175    return primaryOperationTimeout;
176  }
177
178  protected int getCaching() {
179    return caching;
180  }
181
182  protected long getTimestamp() {
183    return lastNext;
184  }
185
186  protected long getMaxResultSize() {
187    return maxScannerResultSize;
188  }
189
190  private void closeScanner() throws IOException {
191    if (this.callable != null) {
192      this.callable.setClose();
193      call(callable, caller, scannerTimeout, false);
194      this.callable = null;
195    }
196  }
197
198  /**
199   * Will be called in moveToNextRegion when currentRegion is null. Abstract because for normal
200   * scan, we will start next scan from the endKey of the currentRegion, and for reversed scan, we
201   * will start next scan from the startKey of the currentRegion.
202   * @return {@code false} if we have reached the stop row. Otherwise {@code true}.
203   */
204  protected abstract boolean setNewStartKey();
205
206  /**
207   * Will be called in moveToNextRegion to create ScannerCallable. Abstract because for reversed
208   * scan we need to create a ReversedScannerCallable.
209   */
210  protected abstract ScannerCallable createScannerCallable();
211
212  /**
213   * Close the previous scanner and create a new ScannerCallable for the next scanner.
214   * <p>
215   * Marked as protected only because TestClientScanner need to override this method.
216   * @return false if we should terminate the scan. Otherwise
217   */
218  protected boolean moveToNextRegion() {
219    // Close the previous scanner if it's open
220    try {
221      closeScanner();
222    } catch (IOException e) {
223      // not a big deal continue
224      if (LOG.isDebugEnabled()) {
225        LOG.debug("close scanner for " + currentRegion + " failed", e);
226      }
227    }
228    if (currentRegion != null) {
229      if (!setNewStartKey()) {
230        return false;
231      }
232      scan.resetMvccReadPoint();
233      if (LOG.isTraceEnabled()) {
234        LOG.trace("Finished " + this.currentRegion);
235      }
236    }
237    if (LOG.isDebugEnabled() && this.currentRegion != null) {
238      // Only worth logging if NOT first region in scan.
239      LOG.debug(
240        "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow()) +
241            "', " + (scan.includeStartRow() ? "inclusive" : "exclusive"));
242    }
243    // clear the current region, we will set a new value to it after the first call of the new
244    // callable.
245    this.currentRegion = null;
246    this.callable =
247        new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool,
248            primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller);
249    this.callable.setCaching(this.caching);
250    incRegionCountMetrics(scanMetrics);
251    return true;
252  }
253
254  boolean isAnyRPCcancelled() {
255    return callable.isAnyRPCcancelled();
256  }
257
258  private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller,
259      int scannerTimeout, boolean updateCurrentRegion) throws IOException {
260    if (Thread.interrupted()) {
261      throw new InterruptedIOException();
262    }
263    // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
264    // we do a callWithRetries
265    Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout);
266    if (currentRegion == null && updateCurrentRegion) {
267      currentRegion = callable.getHRegionInfo();
268    }
269    return rrs;
270  }
271
272  /**
273   * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
274   * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
275   * framework because it doesn't support multi-instances of the same metrics on the same machine;
276   * for scan/map reduce scenarios, we will have multiple scans running at the same time. By
277   * default, scan metrics are disabled; if the application wants to collect them, this behavior can
278   * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
279   */
280  protected void writeScanMetrics() {
281    if (this.scanMetrics == null || scanMetricsPublished) {
282      return;
283    }
284    // Publish ScanMetrics to the Scan Object.
285    // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not
286    // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published
287    // to Scan will be messed up.
288    scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA,
289      ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray());
290    scanMetricsPublished = true;
291  }
292
293  protected void initSyncCache() {
294    cache = new ArrayDeque<>();
295  }
296
297  protected Result nextWithSyncCache() throws IOException {
298    Result result = cache.poll();
299    if (result != null) {
300      return result;
301    }
302    // If there is nothing left in the cache and the scanner is closed,
303    // return a no-op
304    if (this.closed) {
305      return null;
306    }
307
308    loadCache();
309
310    // try again to load from cache
311    result = cache.poll();
312
313    // if we exhausted this scanner before calling close, write out the scan metrics
314    if (result == null) {
315      writeScanMetrics();
316    }
317    return result;
318  }
319
320  public int getCacheSize() {
321    return cache != null ? cache.size() : 0;
322  }
323
324  private boolean scanExhausted(Result[] values) {
325    return callable.moreResultsForScan() == MoreResults.NO;
326  }
327
328  private boolean regionExhausted(Result[] values) {
329    // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the
330    // old time we always return empty result for a open scanner operation so we add a check here to
331    // keep compatible with the old logic. Should remove the isOpenScanner in the future.
332    // 2. Server tells us that it has no more results for this region.
333    return (values.length == 0 && !callable.isHeartbeatMessage()) ||
334        callable.moreResultsInRegion() == MoreResults.NO;
335  }
336
337  private void closeScannerIfExhausted(boolean exhausted) throws IOException {
338    if (exhausted) {
339      closeScanner();
340    }
341  }
342
343  private void handleScanError(DoNotRetryIOException e,
344      MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException {
345    // An exception was thrown which makes any partial results that we were collecting
346    // invalid. The scanner will need to be reset to the beginning of a row.
347    scanResultCache.clear();
348
349    // Unfortunately, DNRIOE is used in two different semantics.
350    // (1) The first is to close the client scanner and bubble up the exception all the way
351    // to the application. This is preferred when the exception is really un-recoverable
352    // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
353    // bucket usually.
354    // (2) Second semantics is to close the current region scanner only, but continue the
355    // client scanner by overriding the exception. This is usually UnknownScannerException,
356    // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
357    // application-level ClientScanner has to continue without bubbling up the exception to
358    // the client. See RSRpcServices to see how it throws DNRIOE's.
359    // See also: HBASE-16604, HBASE-17187
360
361    // If exception is any but the list below throw it back to the client; else setup
362    // the scanner and retry.
363    Throwable cause = e.getCause();
364    if ((cause != null && cause instanceof NotServingRegionException) ||
365        (cause != null && cause instanceof RegionServerStoppedException) ||
366        e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException ||
367        e instanceof ScannerResetException || e instanceof LeaseException) {
368      // Pass. It is easier writing the if loop test as list of what is allowed rather than
369      // as a list of what is not allowed... so if in here, it means we do not throw.
370      if (retriesLeft <= 0) {
371        throw e; // no more retries
372      }
373    } else {
374      throw e;
375    }
376
377    // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
378    if (this.lastResult != null) {
379      // The region has moved. We need to open a brand new scanner at the new location.
380      // Reset the startRow to the row we've seen last so that the new scanner starts at
381      // the correct row. Otherwise we may see previously returned rows again.
382      // If the lastRow is not partial, then we should start from the next row. As now we can
383      // exclude the start row, the logic here is the same for both normal scan and reversed scan.
384      // If lastResult is partial then include it, otherwise exclude it.
385      scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow());
386    }
387    if (e instanceof OutOfOrderScannerNextException) {
388      if (retryAfterOutOfOrderException.isTrue()) {
389        retryAfterOutOfOrderException.setValue(false);
390      } else {
391        // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
392        throw new DoNotRetryIOException(
393            "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e);
394      }
395    }
396    // Clear region.
397    this.currentRegion = null;
398    // Set this to zero so we don't try and do an rpc and close on remote server when
399    // the exception we got was UnknownScanner or the Server is going down.
400    callable = null;
401  }
402
403  /**
404   * Contact the servers to load more {@link Result}s in the cache.
405   */
406  protected void loadCache() throws IOException {
407    // check if scanner was closed during previous prefetch
408    if (closed) {
409      return;
410    }
411    long remainingResultSize = maxScannerResultSize;
412    int countdown = this.caching;
413    // This is possible if we just stopped at the boundary of a region in the previous call.
414    if (callable == null && !moveToNextRegion()) {
415      closed = true;
416      return;
417    }
418    // This flag is set when we want to skip the result returned. We do
419    // this when we reset scanner because it split under us.
420    MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true);
421    // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should
422    // make sure that we are not retrying indefinitely.
423    int retriesLeft = getRetries();
424    for (;;) {
425      Result[] values;
426      try {
427        // Server returns a null values if scanning is to stop. Else,
428        // returns an empty array if scanning is to go on and we've just
429        // exhausted current region.
430        // now we will also fetch data when openScanner, so do not make a next call again if values
431        // is already non-null.
432        values = call(callable, caller, scannerTimeout, true);
433        // When the replica switch happens, we need to do certain operations again.
434        // The callable will openScanner with the right startkey but we need to pick up
435        // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
436        // of the loop as it happens for the cases where we see exceptions.
437        if (callable.switchedToADifferentReplica()) {
438          // Any accumulated partial results are no longer valid since the callable will
439          // openScanner with the correct startkey and we must pick up from there
440          scanResultCache.clear();
441          this.currentRegion = callable.getHRegionInfo();
442        }
443        retryAfterOutOfOrderException.setValue(true);
444      } catch (DoNotRetryIOException e) {
445        handleScanError(e, retryAfterOutOfOrderException, retriesLeft--);
446        // reopen the scanner
447        if (!moveToNextRegion()) {
448          break;
449        }
450        continue;
451      }
452      long currentTime = System.currentTimeMillis();
453      if (this.scanMetrics != null) {
454        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
455      }
456      lastNext = currentTime;
457      // Groom the array of Results that we received back from the server before adding that
458      // Results to the scanner's cache. If partial results are not allowed to be seen by the
459      // caller, all book keeping will be performed within this method.
460      int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows();
461      Result[] resultsToAddToCache =
462          scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
463      int numberOfCompleteRows =
464          scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
465      for (Result rs : resultsToAddToCache) {
466        cache.add(rs);
467        long estimatedHeapSizeOfResult = calcEstimatedSize(rs);
468        countdown--;
469        remainingResultSize -= estimatedHeapSizeOfResult;
470        addEstimatedSize(estimatedHeapSizeOfResult);
471        this.lastResult = rs;
472      }
473
474      if (scan.getLimit() > 0) {
475        int newLimit = scan.getLimit() - numberOfCompleteRows;
476        assert newLimit >= 0;
477        scan.setLimit(newLimit);
478      }
479      if (scan.getLimit() == 0 || scanExhausted(values)) {
480        closeScanner();
481        closed = true;
482        break;
483      }
484      boolean regionExhausted = regionExhausted(values);
485      if (callable.isHeartbeatMessage()) {
486        if (!cache.isEmpty()) {
487          // Caller of this method just wants a Result. If we see a heartbeat message, it means
488          // processing of the scan is taking a long time server side. Rather than continue to
489          // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
490          // unnecesary delays to the caller
491          LOG.trace("Heartbeat message received and cache contains Results. " +
492            "Breaking out of scan loop");
493          // we know that the region has not been exhausted yet so just break without calling
494          // closeScannerIfExhausted
495          break;
496        }
497      }
498      if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) {
499        if (callable.isHeartbeatMessage() && callable.getCursor() != null) {
500          // Use cursor row key from server
501          cache.add(Result.createCursorResult(callable.getCursor()));
502          break;
503        }
504        if (values.length > 0) {
505          // It is size limit exceed and we need return the last Result's row.
506          // When user setBatch and the scanner is reopened, the server may return Results that
507          // user has seen and the last Result can not be seen because the number is not enough.
508          // So the row keys of results may not be same, we must use the last one.
509          cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow())));
510          break;
511        }
512      }
513      if (countdown <= 0) {
514        // we have enough result.
515        closeScannerIfExhausted(regionExhausted);
516        break;
517      }
518      if (remainingResultSize <= 0) {
519        if (!cache.isEmpty()) {
520          closeScannerIfExhausted(regionExhausted);
521          break;
522        } else {
523          // we have reached the max result size but we still can not find anything to return to the
524          // user. Reset the maxResultSize and try again.
525          remainingResultSize = maxScannerResultSize;
526        }
527      }
528      // we are done with the current region
529      if (regionExhausted) {
530        if (!moveToNextRegion()) {
531          closed = true;
532          break;
533        }
534      }
535    }
536  }
537
538  protected void addEstimatedSize(long estimatedHeapSizeOfResult) {
539    return;
540  }
541
542  public int getCacheCount() {
543    return cache != null ? cache.size() : 0;
544  }
545
546  @Override
547  public void close() {
548    if (!scanMetricsPublished) writeScanMetrics();
549    if (callable != null) {
550      callable.setClose();
551      try {
552        call(callable, caller, scannerTimeout, false);
553      } catch (UnknownScannerException e) {
554        // We used to catch this error, interpret, and rethrow. However, we
555        // have since decided that it's not nice for a scanner's close to
556        // throw exceptions. Chances are it was just due to lease time out.
557        LOG.debug("scanner failed to close", e);
558      } catch (IOException e) {
559        /* An exception other than UnknownScanner is unexpected. */
560        LOG.warn("scanner failed to close.", e);
561      }
562      callable = null;
563    }
564    closed = true;
565  }
566
567  @Override
568  public boolean renewLease() {
569    if (callable == null) {
570      return false;
571    }
572    // do not return any rows, do not advance the scanner
573    callable.setRenew(true);
574    try {
575      this.caller.callWithoutRetries(callable, this.scannerTimeout);
576      return true;
577    } catch (Exception e) {
578      LOG.debug("scanner failed to renew lease", e);
579      return false;
580    } finally {
581      callable.setRenew(false);
582    }
583  }
584
585  protected void initCache() {
586    initSyncCache();
587  }
588
589  @Override
590  public Result next() throws IOException {
591    return nextWithSyncCache();
592  }
593}