001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
023
024import io.opentelemetry.api.trace.Span;
025import io.opentelemetry.api.trace.StatusCode;
026import io.opentelemetry.context.Scope;
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.util.ArrayDeque;
030import java.util.Queue;
031import java.util.concurrent.ExecutorService;
032import org.apache.commons.lang3.mutable.MutableBoolean;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.HRegionInfo;
037import org.apache.hadoop.hbase.NotServingRegionException;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.UnknownScannerException;
040import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
041import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
042import org.apache.hadoop.hbase.exceptions.ScannerResetException;
043import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
044import org.apache.hadoop.hbase.regionserver.LeaseException;
045import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053
054/**
055 * Implements the scanner interface for the HBase client. If there are multiple regions in a table,
056 * this scanner will iterate through them all.
057 */
058@InterfaceAudience.Private
059public abstract class ClientScanner extends AbstractClientScanner {
060
061  private static final Logger LOG = LoggerFactory.getLogger(ClientScanner.class);
062
063  protected final Scan scan;
064  protected boolean closed = false;
065  // Current region scanner is against. Gets cleared if current region goes
066  // wonky: e.g. if it splits on us.
067  protected HRegionInfo currentRegion = null;
068  protected ScannerCallableWithReplicas callable = null;
069  protected Queue<Result> cache;
070  private final ScanResultCache scanResultCache;
071  protected final int caching;
072  protected long lastNext;
073  // Keep lastResult returned successfully in case we have to reset scanner.
074  protected Result lastResult = null;
075  protected final long maxScannerResultSize;
076  private final ClusterConnection connection;
077  protected final TableName tableName;
078  protected final int readRpcTimeout;
079  protected final int scannerTimeout;
080  protected boolean scanMetricsPublished = false;
081  protected RpcRetryingCaller<Result[]> caller;
082  protected RpcControllerFactory rpcControllerFactory;
083  protected Configuration conf;
084  protected final Span span;
085  // The timeout on the primary. Applicable if there are multiple replicas for a region
086  // In that case, we will only wait for this much timeout on the primary before going
087  // to the replicas and trying the same scan. Note that the retries will still happen
088  // on each replica and the first successful results will be taken. A timeout of 0 is
089  // disallowed.
090  protected final int primaryOperationTimeout;
091  private int retries;
092  protected final ExecutorService pool;
093
094  /**
095   * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
096   * row maybe changed changed.
097   * @param conf       The {@link Configuration} to use.
098   * @param scan       {@link Scan} to use in this scanner
099   * @param tableName  The table that we wish to scan
100   * @param connection Connection identifying the cluster
101   */
102  public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
103    ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
104    RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
105    int scannerTimeout, int primaryOperationTimeout) throws IOException {
106    if (LOG.isTraceEnabled()) {
107      LOG.trace(
108        "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
109    }
110    this.scan = scan;
111    this.tableName = tableName;
112    this.lastNext = EnvironmentEdgeManager.currentTime();
113    this.connection = connection;
114    this.pool = pool;
115    this.primaryOperationTimeout = primaryOperationTimeout;
116    this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
117      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
118    if (scan.getMaxResultSize() > 0) {
119      this.maxScannerResultSize = scan.getMaxResultSize();
120    } else {
121      this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
122        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
123    }
124    this.readRpcTimeout = scanReadRpcTimeout;
125    this.scannerTimeout = scannerTimeout;
126
127    // check if application wants to collect scan metrics
128    initScanMetrics(scan);
129
130    // Use the caching from the Scan. If not set, use the default cache setting for this table.
131    if (this.scan.getCaching() > 0) {
132      this.caching = this.scan.getCaching();
133    } else {
134      this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
135        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
136    }
137
138    this.caller = rpcFactory.<Result[]> newCaller();
139    this.rpcControllerFactory = controllerFactory;
140
141    this.conf = conf;
142    this.span = Span.current();
143
144    this.scanResultCache = createScanResultCache(scan);
145    initCache();
146  }
147
148  protected final int getScanReplicaId() {
149    return Math.max(scan.getReplicaId(), RegionReplicaUtil.DEFAULT_REPLICA_ID);
150  }
151
152  protected ClusterConnection getConnection() {
153    return this.connection;
154  }
155
156  protected TableName getTable() {
157    return this.tableName;
158  }
159
160  protected int getRetries() {
161    return this.retries;
162  }
163
164  protected int getScannerTimeout() {
165    return this.scannerTimeout;
166  }
167
168  protected Configuration getConf() {
169    return this.conf;
170  }
171
172  protected Scan getScan() {
173    return scan;
174  }
175
176  protected ExecutorService getPool() {
177    return pool;
178  }
179
180  protected int getPrimaryOperationTimeout() {
181    return primaryOperationTimeout;
182  }
183
184  protected int getCaching() {
185    return caching;
186  }
187
188  protected long getTimestamp() {
189    return lastNext;
190  }
191
192  protected long getMaxResultSize() {
193    return maxScannerResultSize;
194  }
195
196  private void closeScanner() throws IOException {
197    if (this.callable != null) {
198      this.callable.setClose();
199      call(callable, caller, scannerTimeout, false);
200      this.callable = null;
201    }
202  }
203
204  /**
205   * Will be called in moveToNextRegion when currentRegion is null. Abstract because for normal
206   * scan, we will start next scan from the endKey of the currentRegion, and for reversed scan, we
207   * will start next scan from the startKey of the currentRegion.
208   * @return {@code false} if we have reached the stop row. Otherwise {@code true}.
209   */
210  protected abstract boolean setNewStartKey();
211
212  /**
213   * Will be called in moveToNextRegion to create ScannerCallable. Abstract because for reversed
214   * scan we need to create a ReversedScannerCallable.
215   */
216  protected abstract ScannerCallable createScannerCallable();
217
218  /**
219   * Close the previous scanner and create a new ScannerCallable for the next scanner.
220   * <p>
221   * Marked as protected only because TestClientScanner need to override this method.
222   * @return false if we should terminate the scan. Otherwise
223   */
224  protected boolean moveToNextRegion() {
225    // Close the previous scanner if it's open
226    try {
227      closeScanner();
228    } catch (IOException e) {
229      // not a big deal continue
230      if (LOG.isDebugEnabled()) {
231        LOG.debug("close scanner for " + currentRegion + " failed", e);
232      }
233    }
234    if (currentRegion != null) {
235      if (!setNewStartKey()) {
236        return false;
237      }
238      scan.resetMvccReadPoint();
239      if (LOG.isTraceEnabled()) {
240        LOG.trace("Finished " + this.currentRegion);
241      }
242    }
243    if (LOG.isDebugEnabled() && this.currentRegion != null) {
244      // Only worth logging if NOT first region in scan.
245      LOG.debug(
246        "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow())
247          + "', " + (scan.includeStartRow() ? "inclusive" : "exclusive"));
248    }
249    // clear the current region, we will set a new value to it after the first call of the new
250    // callable.
251    this.currentRegion = null;
252    this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(),
253      createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout,
254      scannerTimeout, caching, conf, caller);
255    this.callable.setCaching(this.caching);
256    incRegionCountMetrics(scanMetrics);
257    return true;
258  }
259
260  boolean isAnyRPCcancelled() {
261    return callable.isAnyRPCcancelled();
262  }
263
264  private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller,
265    int scannerTimeout, boolean updateCurrentRegion) throws IOException {
266    if (Thread.interrupted()) {
267      throw new InterruptedIOException();
268    }
269    // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
270    // we do a callWithRetries
271    Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout);
272    if (currentRegion == null && updateCurrentRegion) {
273      currentRegion = callable.getHRegionInfo();
274    }
275    return rrs;
276  }
277
278  /**
279   * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
280   * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
281   * framework because it doesn't support multi-instances of the same metrics on the same machine;
282   * for scan/map reduce scenarios, we will have multiple scans running at the same time. By
283   * default, scan metrics are disabled; if the application wants to collect them, this behavior can
284   * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
285   */
286  protected void writeScanMetrics() {
287    if (this.scanMetrics == null || scanMetricsPublished) {
288      return;
289    }
290    // Publish ScanMetrics to the Scan Object.
291    // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not
292    // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published
293    // to Scan will be messed up.
294    scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA,
295      ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray());
296    scanMetricsPublished = true;
297  }
298
299  protected void initSyncCache() {
300    cache = new ArrayDeque<>();
301  }
302
303  protected Result nextWithSyncCache() throws IOException {
304    Result result = cache.poll();
305    if (result != null) {
306      return result;
307    }
308    // If there is nothing left in the cache and the scanner is closed,
309    // return a no-op
310    if (this.closed) {
311      return null;
312    }
313
314    loadCache();
315
316    // try again to load from cache
317    result = cache.poll();
318
319    // if we exhausted this scanner before calling close, write out the scan metrics
320    if (result == null) {
321      writeScanMetrics();
322    }
323    return result;
324  }
325
326  public int getCacheSize() {
327    return cache != null ? cache.size() : 0;
328  }
329
330  private boolean scanExhausted() {
331    return callable.moreResultsForScan() == MoreResults.NO;
332  }
333
334  private boolean regionExhausted(Result[] values) {
335    // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the
336    // old time we always return empty result for a open scanner operation so we add a check here to
337    // keep compatible with the old logic. Should remove the isOpenScanner in the future.
338    // 2. Server tells us that it has no more results for this region.
339    return (values.length == 0 && !callable.isHeartbeatMessage())
340      || callable.moreResultsInRegion() == MoreResults.NO;
341  }
342
343  private void closeScannerIfExhausted(boolean exhausted) throws IOException {
344    if (exhausted) {
345      closeScanner();
346    }
347  }
348
349  private void handleScanError(DoNotRetryIOException e,
350    MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException {
351    // An exception was thrown which makes any partial results that we were collecting
352    // invalid. The scanner will need to be reset to the beginning of a row.
353    scanResultCache.clear();
354
355    // Unfortunately, DNRIOE is used in two different semantics.
356    // (1) The first is to close the client scanner and bubble up the exception all the way
357    // to the application. This is preferred when the exception is really un-recoverable
358    // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
359    // bucket usually.
360    // (2) Second semantics is to close the current region scanner only, but continue the
361    // client scanner by overriding the exception. This is usually UnknownScannerException,
362    // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
363    // application-level ClientScanner has to continue without bubbling up the exception to
364    // the client. See RSRpcServices to see how it throws DNRIOE's.
365    // See also: HBASE-16604, HBASE-17187
366
367    // If exception is any but the list below throw it back to the client; else setup
368    // the scanner and retry.
369    Throwable cause = e.getCause();
370    if (
371      (cause != null && cause instanceof NotServingRegionException)
372        || (cause != null && cause instanceof RegionServerStoppedException)
373        || e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException
374        || e instanceof ScannerResetException || e instanceof LeaseException
375    ) {
376      // Pass. It is easier writing the if loop test as list of what is allowed rather than
377      // as a list of what is not allowed... so if in here, it means we do not throw.
378      if (retriesLeft <= 0) {
379        throw e; // no more retries
380      }
381    } else {
382      throw e;
383    }
384
385    // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
386    if (this.lastResult != null) {
387      // The region has moved. We need to open a brand new scanner at the new location.
388      // Reset the startRow to the row we've seen last so that the new scanner starts at
389      // the correct row. Otherwise we may see previously returned rows again.
390      // If the lastRow is not partial, then we should start from the next row. As now we can
391      // exclude the start row, the logic here is the same for both normal scan and reversed scan.
392      // If lastResult is partial then include it, otherwise exclude it.
393      scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow());
394    }
395    if (e instanceof OutOfOrderScannerNextException) {
396      if (retryAfterOutOfOrderException.isTrue()) {
397        retryAfterOutOfOrderException.setValue(false);
398      } else {
399        // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
400        throw new DoNotRetryIOException(
401          "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e);
402      }
403    }
404    // Clear region.
405    this.currentRegion = null;
406    // Set this to zero so we don't try and do an rpc and close on remote server when
407    // the exception we got was UnknownScanner or the Server is going down.
408    callable = null;
409  }
410
411  /**
412   * Contact the servers to load more {@link Result}s in the cache.
413   */
414  protected void loadCache() throws IOException {
415    // check if scanner was closed during previous prefetch
416    if (closed) {
417      return;
418    }
419    long remainingResultSize = maxScannerResultSize;
420    int countdown = this.caching;
421    // This is possible if we just stopped at the boundary of a region in the previous call.
422    if (callable == null && !moveToNextRegion()) {
423      closed = true;
424      return;
425    }
426    // This flag is set when we want to skip the result returned. We do
427    // this when we reset scanner because it split under us.
428    MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true);
429    // Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should
430    // make sure that we are not retrying indefinitely.
431    int retriesLeft = getRetries();
432    for (;;) {
433      Result[] values;
434      try {
435        // Server returns a null values if scanning is to stop. Else,
436        // returns an empty array if scanning is to go on and we've just
437        // exhausted current region.
438        // now we will also fetch data when openScanner, so do not make a next call again if values
439        // is already non-null.
440        values = call(callable, caller, scannerTimeout, true);
441        // When the replica switch happens, we need to do certain operations again.
442        // The callable will openScanner with the right startkey but we need to pick up
443        // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
444        // of the loop as it happens for the cases where we see exceptions.
445        if (callable.switchedToADifferentReplica()) {
446          // Any accumulated partial results are no longer valid since the callable will
447          // openScanner with the correct startkey and we must pick up from there
448          scanResultCache.clear();
449          this.currentRegion = callable.getHRegionInfo();
450        }
451        retryAfterOutOfOrderException.setValue(true);
452      } catch (DoNotRetryIOException e) {
453        handleScanError(e, retryAfterOutOfOrderException, retriesLeft--);
454        // reopen the scanner
455        if (!moveToNextRegion()) {
456          break;
457        }
458        continue;
459      }
460      long currentTime = EnvironmentEdgeManager.currentTime();
461      if (this.scanMetrics != null) {
462        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
463      }
464      lastNext = currentTime;
465      // Groom the array of Results that we received back from the server before adding that
466      // Results to the scanner's cache. If partial results are not allowed to be seen by the
467      // caller, all book keeping will be performed within this method.
468      int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows();
469      Result[] resultsToAddToCache =
470        scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
471      int numberOfCompleteRows =
472        scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
473      for (Result rs : resultsToAddToCache) {
474        cache.add(rs);
475        long estimatedHeapSizeOfResult = calcEstimatedSize(rs);
476        countdown--;
477        remainingResultSize -= estimatedHeapSizeOfResult;
478        addEstimatedSize(estimatedHeapSizeOfResult);
479        this.lastResult = rs;
480      }
481
482      if (scan.getLimit() > 0) {
483        int newLimit = scan.getLimit() - numberOfCompleteRows;
484        assert newLimit >= 0;
485        scan.setLimit(newLimit);
486      }
487      if (scan.getLimit() == 0 || scanExhausted()) {
488        closeScanner();
489        closed = true;
490        break;
491      }
492      boolean regionExhausted = regionExhausted(values);
493      if (callable.isHeartbeatMessage()) {
494        if (!cache.isEmpty()) {
495          // Caller of this method just wants a Result. If we see a heartbeat message, it means
496          // processing of the scan is taking a long time server side. Rather than continue to
497          // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
498          // unnecesary delays to the caller
499          LOG.trace("Heartbeat message received and cache contains Results. "
500            + "Breaking out of scan loop");
501          // we know that the region has not been exhausted yet so just break without calling
502          // closeScannerIfExhausted
503          break;
504        }
505      }
506      if (cache.isEmpty() && !closed && scan.isNeedCursorResult()) {
507        if (callable.isHeartbeatMessage() && callable.getCursor() != null) {
508          // Use cursor row key from server
509          cache.add(Result.createCursorResult(callable.getCursor()));
510          break;
511        }
512        if (values.length > 0) {
513          // It is size limit exceed and we need return the last Result's row.
514          // When user setBatch and the scanner is reopened, the server may return Results that
515          // user has seen and the last Result can not be seen because the number is not enough.
516          // So the row keys of results may not be same, we must use the last one.
517          cache.add(Result.createCursorResult(new Cursor(values[values.length - 1].getRow())));
518          break;
519        }
520      }
521      if (countdown <= 0) {
522        // we have enough result.
523        closeScannerIfExhausted(regionExhausted);
524        break;
525      }
526      if (remainingResultSize <= 0) {
527        if (!cache.isEmpty()) {
528          closeScannerIfExhausted(regionExhausted);
529          break;
530        } else {
531          // we have reached the max result size but we still can not find anything to return to the
532          // user. Reset the maxResultSize and try again.
533          remainingResultSize = maxScannerResultSize;
534        }
535      }
536      // we are done with the current region
537      if (regionExhausted) {
538        if (!moveToNextRegion()) {
539          closed = true;
540          break;
541        }
542      }
543    }
544  }
545
546  protected void addEstimatedSize(long estimatedHeapSizeOfResult) {
547    return;
548  }
549
550  public int getCacheCount() {
551    return cache != null ? cache.size() : 0;
552  }
553
554  @Override
555  public void close() {
556    try (Scope ignored = span.makeCurrent()) {
557      if (!scanMetricsPublished) {
558        writeScanMetrics();
559      }
560      if (callable != null) {
561        callable.setClose();
562        try {
563          call(callable, caller, scannerTimeout, false);
564        } catch (UnknownScannerException e) {
565          // We used to catch this error, interpret, and rethrow. However, we
566          // have since decided that it's not nice for a scanner's close to
567          // throw exceptions. Chances are it was just due to lease time out.
568          LOG.debug("scanner failed to close", e);
569        } catch (IOException e) {
570          /* An exception other than UnknownScanner is unexpected. */
571          LOG.warn("scanner failed to close.", e);
572          span.recordException(e);
573          span.setStatus(StatusCode.ERROR);
574        }
575        callable = null;
576      }
577      closed = true;
578      span.setStatus(StatusCode.OK);
579    } finally {
580      span.end();
581    }
582  }
583
584  @Override
585  public boolean renewLease() {
586    try (Scope ignored = span.makeCurrent()) {
587      if (callable == null) {
588        return false;
589      }
590      // do not return any rows, do not advance the scanner
591      callable.setRenew(true);
592      try {
593        this.caller.callWithoutRetries(callable, this.scannerTimeout);
594        return true;
595      } catch (Exception e) {
596        LOG.debug("scanner failed to renew lease", e);
597        span.recordException(e);
598        return false;
599      } finally {
600        callable.setRenew(false);
601      }
602    }
603  }
604
605  protected void initCache() {
606    initSyncCache();
607  }
608
609  @Override
610  public Result next() throws IOException {
611    try (Scope ignored = span.makeCurrent()) {
612      return nextWithSyncCache();
613    }
614  }
615}