001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
023
024import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
025
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.util.ArrayDeque;
029import java.util.Queue;
030import java.util.concurrent.ExecutorService;
031
032import org.apache.commons.lang3.mutable.MutableBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
035import org.apache.hadoop.hbase.DoNotRetryIOException;
036import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
037import org.apache.hadoop.hbase.exceptions.ScannerResetException;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionInfo;
040import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
041import org.apache.hadoop.hbase.regionserver.LeaseException;
042import org.apache.hadoop.hbase.NotServingRegionException;
043import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.UnknownScannerException;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
052
053/**
054 * Implements the scanner interface for the HBase client. If there are multiple regions in a table,
055 * this scanner will iterate through them all.
056 */
057@InterfaceAudience.Private
058public abstract class ClientScanner extends AbstractClientScanner {
059
060  private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class);
061
062  protected final Scan scan;
063  protected boolean closed = false;
064  // Current region scanner is against. Gets cleared if current region goes
065  // wonky: e.g. if it splits on us.
066  protected HRegionInfo currentRegion = null;
067  protected ScannerCallableWithReplicas callable = null;
068  protected Queue<Result> cache;
069  private final ScanResultCache scanResultCache;
070  protected final int caching;
071  protected long lastNext;
072  // Keep lastResult returned successfully in case we have to reset scanner.
073  protected Result lastResult = null;
074  protected final long maxScannerResultSize;
075  private final ClusterConnection connection;
076  protected final TableName tableName;
077  protected final int scannerTimeout;
078  protected boolean scanMetricsPublished = false;
079  protected RpcRetryingCaller<Result[]> caller;
080  protected RpcControllerFactory rpcControllerFactory;
081  protected Configuration conf;
082  // The timeout on the primary. Applicable if there are multiple replicas for a region
083  // In that case, we will only wait for this much timeout on the primary before going
084  // to the replicas and trying the same scan. Note that the retries will still happen
085  // on each replica and the first successful results will be taken. A timeout of 0 is
086  // disallowed.
087  protected final int primaryOperationTimeout;
088  private int retries;
089  protected final ExecutorService pool;
090
091  /**
092   * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
093   * row maybe changed changed.
094   * @param conf The {@link Configuration} to use.
095   * @param scan {@link Scan} to use in this scanner
096   * @param tableName The table that we wish to scan
097   * @param connection Connection identifying the cluster
098   * @throws IOException
099   */
100  public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
101      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
102      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
103      throws IOException {
104    if (LOG.isTraceEnabled()) {
105      LOG.trace(
106        "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
107    }
108    this.scan = scan;
109    this.tableName = tableName;
110    this.lastNext = System.currentTimeMillis();
111    this.connection = connection;
112    this.pool = pool;
113    this.primaryOperationTimeout = primaryOperationTimeout;
114    this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
115      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
116    if (scan.getMaxResultSize() > 0) {
117      this.maxScannerResultSize = scan.getMaxResultSize();
118    } else {
119      this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
120        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
121    }
122    this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
123        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
124
125    // check if application wants to collect scan metrics
126    initScanMetrics(scan);
127
128    // Use the caching from the Scan. If not set, use the default cache setting for this table.
129    if (this.scan.getCaching() > 0) {
130      this.caching = this.scan.getCaching();
131    } else {
132      this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
133        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
134    }
135
136    this.caller = rpcFactory.<Result[]> newCaller();
137    this.rpcControllerFactory = controllerFactory;
138
139    this.conf = conf;
140
141    this.scanResultCache = createScanResultCache(scan);
142    initCache();
143  }
144
145  protected ClusterConnection getConnection() {
146    return this.connection;
147  }
148
149  protected TableName getTable() {
150    return this.tableName;
151  }
152
153  protected int getRetries() {
154    return this.retries;
155  }
156
157  protected int getScannerTimeout() {
158    return this.scannerTimeout;
159  }
160
161  protected Configuration getConf() {
162    return this.conf;
163  }
164
165  protected Scan getScan() {
166    return scan;
167  }
168
169  protected ExecutorService getPool() {
170    return pool;
171  }
172
173  protected int getPrimaryOperationTimeout() {
174    return primaryOperationTimeout;
175  }
176
177  protected int getCaching() {
178    return caching;
179  }
180
181  protected long getTimestamp() {
182    return lastNext;
183  }
184
185  @VisibleForTesting
186  protected long getMaxResultSize() {
187    return maxScannerResultSize;
188  }
189
190  private void closeScanner() throws IOException {
191    if (this.callable != null) {
192      this.callable.setClose();
193      call(callable, caller, scannerTimeout, false);
194      this.callable = null;
195    }
196  }
197
198  /**
199   * Will be called in moveToNextRegion when currentRegion is null. Abstract because for normal
200   * scan, we will start next scan from the endKey of the currentRegion, and for reversed scan, we
201   * will start next scan from the startKey of the currentRegion.
202   * @return {@code false} if we have reached the stop row. Otherwise {@code true}.
203   */
204  protected abstract boolean setNewStartKey();
205
206  /**
207   * Will be called in moveToNextRegion to create ScannerCallable. Abstract because for reversed
208   * scan we need to create a ReversedScannerCallable.
209   */
210  protected abstract ScannerCallable createScannerCallable();
211
212  /**
213   * Close the previous scanner and create a new ScannerCallable for the next scanner.
214   * <p>
215   * Marked as protected only because TestClientScanner need to override this method.
216   * @return false if we should terminate the scan. Otherwise
217   */
218  @VisibleForTesting
219  protected boolean moveToNextRegion() {
220    // Close the previous scanner if it's open
221    try {
222      closeScanner();
223    } catch (IOException e) {
224      // not a big deal continue
225      if (LOG.isDebugEnabled()) {
226        LOG.debug("close scanner for " + currentRegion + " failed", e);
227      }
228    }
229    if (currentRegion != null) {
230      if (!setNewStartKey()) {
231        return false;
232      }
233      scan.resetMvccReadPoint();
234      if (LOG.isTraceEnabled()) {
235        LOG.trace("Finished " + this.currentRegion);
236      }
237    }
238    if (LOG.isDebugEnabled() && this.currentRegion != null) {
239      // Only worth logging if NOT first region in scan.
240      LOG.debug(
241        "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow()) +
242            "', " + (scan.includeStartRow() ? "inclusive" : "exclusive"));
243    }
244    // clear the current region, we will set a new value to it after the first call of the new
245    // callable.
246    this.currentRegion = null;
247    this.callable =
248        new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool,
249            primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller);
250    this.callable.setCaching(this.caching);
251    incRegionCountMetrics(scanMetrics);
252    return true;
253  }
254
255  @VisibleForTesting
256  boolean isAnyRPCcancelled() {
257    return callable.isAnyRPCcancelled();
258  }
259
260  private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller,
261      int scannerTimeout, boolean updateCurrentRegion) throws IOException {
262    if (Thread.interrupted()) {
263      throw new InterruptedIOException();
264    }
265    // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
266    // we do a callWithRetries
267    Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout);
268    if (currentRegion == null && updateCurrentRegion) {
269      currentRegion = callable.getHRegionInfo();
270    }
271    return rrs;
272  }
273
274  /**
275   * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
276   * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
277   * framework because it doesn't support multi-instances of the same metrics on the same machine;
278   * for scan/map reduce scenarios, we will have multiple scans running at the same time. By
279   * default, scan metrics are disabled; if the application wants to collect them, this behavior can
280   * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
281   */
282  protected void writeScanMetrics() {
283    if (this.scanMetrics == null || scanMetricsPublished) {
284      return;
285    }
286    // Publish ScanMetrics to the Scan Object.
287    // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not
288    // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published
289    // to Scan will be messed up.
290    scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA,
291      ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray());
292    scanMetricsPublished = true;
293  }
294
295  protected void initSyncCache() {
296    cache = new ArrayDeque<>();
297  }
298
299  protected Result nextWithSyncCache() throws IOException {
300    Result result = cache.poll();
301    if (result != null) {
302      return result;
303    }
304    // If there is nothing left in the cache and the scanner is closed,
305    // return a no-op
306    if (this.closed) {
307      return null;
308    }
309
310    loadCache();
311
312    // try again to load from cache
313    result = cache.poll();
314
315    // if we exhausted this scanner before calling close, write out the scan metrics
316    if (result == null) {
317      writeScanMetrics();
318    }
319    return result;
320  }
321
322  @VisibleForTesting
323  public int getCacheSize() {
324    return cache != null ? cache.size() : 0;
325  }
326
327  private boolean scanExhausted(Result[] values) {
328    return callable.moreResultsForScan() == MoreResults.NO;
329  }
330
331  private boolean regionExhausted(Result[] values) {
332    // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the
333    // old time we always return empty result for a open scanner operation so we add a check here to
334    // keep compatible with the old logic. Should remove the isOpenScanner in the future.
335    // 2. Server tells us that it has no more results for this region.
336    return (values.length == 0 && !callable.isHeartbeatMessage()) ||
337        callable.moreResultsInRegion() == MoreResults.NO;
338  }
339
340  private void closeScannerIfExhausted(boolean exhausted) throws IOException {
341    if (exhausted) {
342      closeScanner();
343    }
344  }
345
346  private void handleScanError(DoNotRetryIOException e,
347      MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException {
348    // An exception was thrown which makes any partial results that we were collecting
349    // invalid. The scanner will need to be reset to the beginning of a row.
350    scanResultCache.clear();
351
352    // Unfortunately, DNRIOE is used in two different semantics.
353    // (1) The first is to close the client scanner and bubble up the exception all the way
354    // to the application. This is preferred when the exception is really un-recoverable
355    // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
356    // bucket usually.
357    // (2) Second semantics is to close the current region scanner only, but continue the
358    // client scanner by overriding the exception. This is usually UnknownScannerException,
359    // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
360    // application-level ClientScanner has to continue without bubbling up the exception to
361    // the client. See RSRpcServices to see how it throws DNRIOE's.
362    // See also: HBASE-16604, HBASE-17187
363
364    // If exception is any but the list below throw it back to the client; else setup
365    // the scanner and retry.
366    Throwable cause = e.getCause();
367    if ((cause != null && cause instanceof NotServingRegionException) ||
368        (cause != null && cause instanceof RegionServerStoppedException) ||
369        e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException ||
370        e instanceof ScannerResetException || e instanceof LeaseException) {
371      // Pass. It is easier writing the if loop test as list of what is allowed rather than
372      // as a list of what is not allowed... so if in here, it means we do not throw.
373      if (retriesLeft <= 0) {
374        throw e; // no more retries
375      }
376    } else {
377      throw e;
378    }
379
380    // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
381    if (this.lastResult != null) {
382      // The region has moved. We need to open a brand new scanner at the new location.
383      // Reset the startRow to the row we've seen last so that the new scanner starts at
384      // the correct row. Otherwise we may see previously returned rows again.
385      // If the lastRow is not partial, then we should start from the next row. As now we can
386      // exclude the start row, the logic here is the same for both normal scan and reversed scan.
387      // If lastResult is partial then include it, otherwise exclude it.
388      scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow());
389    }
390    if (e instanceof OutOfOrderScannerNextException) {
391      if (retryAfterOutOfOrderException.isTrue()) {
392        retryAfterOutOfOrderException.setValue(false);
393      } else {
394        // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
395        throw new DoNotRetryIOException(
396            "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e);
397      }
398    }
399    // Clear region.
400    this.currentRegion = null;
401    // Set this to zero so we don't try and do an rpc and close on remote server when
402    // the exception we got was UnknownScanner or the Server is going down.
403    callable = null;
404  }
405
406  /**
407   * Contact the servers to load more {@link Result}s in the cache.
408   */
409  protected void loadCache() throws IOException {
410    // check if scanner was closed during previous prefetch
411    if (closed) {
412      return;
413    }
414    long remainingResultSize = maxScannerResultSize;
415    int countdown = this.caching;
416    // This is possible if we just stopped at the boundary of a region in the previous call.
417    if (callable == null && !moveToNextRegion()) {
418      closed = true;
419      return;
420    }
421    // This flag is set when we want to skip the result returned. We do
422    // this when we reset scanner because it split under us.
423    MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true);
424    // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should
425    // make sure that we are not retrying indefinitely.
426    int retriesLeft = getRetries();
427    for (;;) {
428      Result[] values;
429      try {
430        // Server returns a null values if scanning is to stop. Else,
431        // returns an empty array if scanning is to go on and we've just
432        // exhausted current region.
433        // now we will also fetch data when openScanner, so do not make a next call again if values
434        // is already non-null.
435        values = call(callable, caller, scannerTimeout, true);
436        // When the replica switch happens, we need to do certain operations again.
437        // The callable will openScanner with the right startkey but we need to pick up
438        // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
439        // of the loop as it happens for the cases where we see exceptions.
440        if (callable.switchedToADifferentReplica()) {
441          // Any accumulated partial results are no longer valid since the callable will
442          // openScanner with the correct startkey and we must pick up from there
443          scanResultCache.clear();
444          this.currentRegion = callable.getHRegionInfo();
445        }
446        retryAfterOutOfOrderException.setValue(true);
447      } catch (DoNotRetryIOException e) {
448        handleScanError(e, retryAfterOutOfOrderException, retriesLeft--);
449        // reopen the scanner
450        if (!moveToNextRegion()) {
451          break;
452        }
453        continue;
454      }
455      long currentTime = System.currentTimeMillis();
456      if (this.scanMetrics != null) {
457        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
458      }
459      lastNext = currentTime;
460      // Groom the array of Results that we received back from the server before adding that
461      // Results to the scanner's cache. If partial results are not allowed to be seen by the
462      // caller, all book keeping will be performed within this method.
463      int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows();
464      Result[] resultsToAddToCache =
465          scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
466      int numberOfCompleteRows =
467          scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
468      for (Result rs : resultsToAddToCache) {
469        cache.add(rs);
470        long estimatedHeapSizeOfResult = calcEstimatedSize(rs);
471        countdown--;
472        remainingResultSize -= estimatedHeapSizeOfResult;
473        addEstimatedSize(estimatedHeapSizeOfResult);
474        this.lastResult = rs;
475      }
476
477      if (scan.getLimit() > 0) {
478        int newLimit = scan.getLimit() - numberOfCompleteRows;
479        assert newLimit >= 0;
480        scan.setLimit(newLimit);
481      }
482      if (scan.getLimit() == 0 || scanExhausted(values)) {
483        closeScanner();
484        closed = true;
485        break;
486      }
487      boolean regionExhausted = regionExhausted(values);
488      if (callable.isHeartbeatMessage()) {
489        if (!cache.isEmpty()) {
490          // Caller of this method just wants a Result. If we see a heartbeat message, it means
491          // processing of the scan is taking a long time server side. Rather than continue to
492          // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
493          // unnecesary delays to the caller
494          LOG.trace("Heartbeat message received and cache contains Results. " +
495            "Breaking out of scan loop");
496          // we know that the region has not been exhausted yet so just break without calling
497          // closeScannerIfExhausted
498          break;
499        }
500      }
501      if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) {
502        if (callable.isHeartbeatMessage() && callable.getCursor() != null) {
503          // Use cursor row key from server
504          cache.add(Result.createCursorResult(callable.getCursor()));
505          break;
506        }
507        if (values.length > 0) {
508          // It is size limit exceed and we need return the last Result's row.
509          // When user setBatch and the scanner is reopened, the server may return Results that
510          // user has seen and the last Result can not be seen because the number is not enough.
511          // So the row keys of results may not be same, we must use the last one.
512          cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow())));
513          break;
514        }
515      }
516      if (countdown <= 0) {
517        // we have enough result.
518        closeScannerIfExhausted(regionExhausted);
519        break;
520      }
521      if (remainingResultSize <= 0) {
522        if (!cache.isEmpty()) {
523          closeScannerIfExhausted(regionExhausted);
524          break;
525        } else {
526          // we have reached the max result size but we still can not find anything to return to the
527          // user. Reset the maxResultSize and try again.
528          remainingResultSize = maxScannerResultSize;
529        }
530      }
531      // we are done with the current region
532      if (regionExhausted) {
533        if (!moveToNextRegion()) {
534          closed = true;
535          break;
536        }
537      }
538    }
539  }
540
541  protected void addEstimatedSize(long estimatedHeapSizeOfResult) {
542    return;
543  }
544
545  @VisibleForTesting
546  public int getCacheCount() {
547    return cache != null ? cache.size() : 0;
548  }
549
550  @Override
551  public void close() {
552    if (!scanMetricsPublished) writeScanMetrics();
553    if (callable != null) {
554      callable.setClose();
555      try {
556        call(callable, caller, scannerTimeout, false);
557      } catch (UnknownScannerException e) {
558        // We used to catch this error, interpret, and rethrow. However, we
559        // have since decided that it's not nice for a scanner's close to
560        // throw exceptions. Chances are it was just due to lease time out.
561        LOG.debug("scanner failed to close", e);
562      } catch (IOException e) {
563        /* An exception other than UnknownScanner is unexpected. */
564        LOG.warn("scanner failed to close.", e);
565      }
566      callable = null;
567    }
568    closed = true;
569  }
570
571  @Override
572  public boolean renewLease() {
573    if (callable == null) {
574      return false;
575    }
576    // do not return any rows, do not advance the scanner
577    callable.setRenew(true);
578    try {
579      this.caller.callWithoutRetries(callable, this.scannerTimeout);
580      return true;
581    } catch (Exception e) {
582      LOG.debug("scanner failed to renew lease", e);
583      return false;
584    } finally {
585      callable.setRenew(false);
586    }
587  }
588
589  protected void initCache() {
590    initSyncCache();
591  }
592
593  @Override
594  public Result next() throws IOException {
595    return nextWithSyncCache();
596  }
597}