001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.List;
026import java.util.Optional;
027import java.util.PriorityQueue;
028import java.util.concurrent.atomic.LongAdder;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellComparator;
032import org.apache.hadoop.hbase.HBaseInterfaceAudience;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.io.TimeRange;
037import org.apache.hadoop.hbase.io.hfile.HFileScanner;
038import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.yetus.audience.InterfaceStability;
041
042/**
043 * KeyValueScanner adaptor over the Reader. It also provides hooks into bloom filter things.
044 */
045@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
046@InterfaceStability.Evolving
047public class StoreFileScanner implements KeyValueScanner {
048  // the reader it comes from:
049  private final StoreFileReader reader;
050  private final HFileScanner hfs;
051  private Cell cur = null;
052  private boolean closed = false;
053
054  private boolean realSeekDone;
055  private boolean delayedReseek;
056  private Cell delayedSeekKV;
057
058  private final boolean enforceMVCC;
059  private final boolean hasMVCCInfo;
060  // A flag represents whether could stop skipping KeyValues for MVCC
061  // if have encountered the next row. Only used for reversed scan
062  private boolean stopSkippingKVsIfNextRow = false;
063
064  private static LongAdder seekCount;
065
066  private final boolean canOptimizeForNonNullColumn;
067
068  private final long readPt;
069
070  // Order of this scanner relative to other scanners when duplicate key-value is found.
071  // Higher values means scanner has newer data.
072  private final long scannerOrder;
073
074  /**
075   * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
076   * @param useMVCC                     If true, scanner will filter out updates with MVCC larger
077   *                                    than {@code readPt}.
078   * @param readPt                      MVCC value to use to filter out the updates newer than this
079   *                                    scanner.
080   * @param hasMVCC                     Set to true if underlying store file reader has MVCC info.
081   * @param scannerOrder                Order of the scanner relative to other scanners. See
082   *                                    {@link KeyValueScanner#getScannerOrder()}.
083   * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column,
084   *                                    otherwise {@code false}. This is a hint for optimization.
085   */
086  public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC,
087    boolean hasMVCC, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) {
088    this.readPt = readPt;
089    this.reader = reader;
090    this.hfs = hfs;
091    this.enforceMVCC = useMVCC;
092    this.hasMVCCInfo = hasMVCC;
093    this.scannerOrder = scannerOrder;
094    this.canOptimizeForNonNullColumn = canOptimizeForNonNullColumn;
095    this.reader.incrementRefCount();
096  }
097
098  /**
099   * Return an array of scanners corresponding to the given set of store files.
100   */
101  public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files,
102    boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean useDropBehind, long readPt)
103    throws IOException {
104    return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, useDropBehind, null,
105      readPt);
106  }
107
108  /**
109   * Return an array of scanners corresponding to the given set of store files, And set the
110   * ScanQueryMatcher for each store file scanner for further optimization
111   */
112  public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files,
113    boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop,
114    ScanQueryMatcher matcher, long readPt) throws IOException {
115    if (files.isEmpty()) {
116      return Collections.emptyList();
117    }
118    List<StoreFileScanner> scanners = new ArrayList<>(files.size());
119    boolean canOptimizeForNonNullColumn = matcher != null ? !matcher.hasNullColumnInQuery() : false;
120    PriorityQueue<HStoreFile> sortedFiles =
121      new PriorityQueue<>(files.size(), StoreFileComparators.SEQ_ID);
122    for (HStoreFile file : files) {
123      // The sort function needs metadata so we need to open reader first before sorting the list.
124      file.initReader();
125      sortedFiles.add(file);
126    }
127    boolean succ = false;
128    try {
129      for (int i = 0, n = files.size(); i < n; i++) {
130        HStoreFile sf = sortedFiles.remove();
131        StoreFileScanner scanner;
132        if (usePread) {
133          scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn);
134        } else {
135          scanner = sf.getStreamScanner(canUseDrop, cacheBlocks, isCompaction, readPt, i,
136            canOptimizeForNonNullColumn);
137        }
138        scanners.add(scanner);
139      }
140      succ = true;
141    } finally {
142      if (!succ) {
143        for (StoreFileScanner scanner : scanners) {
144          scanner.close();
145        }
146      }
147    }
148    return scanners;
149  }
150
151  /**
152   * Get scanners for compaction. We will create a separated reader for each store file to avoid
153   * contention with normal read request.
154   */
155  public static List<StoreFileScanner> getScannersForCompaction(Collection<HStoreFile> files,
156    boolean canUseDropBehind, long readPt) throws IOException {
157    List<StoreFileScanner> scanners = new ArrayList<>(files.size());
158    List<HStoreFile> sortedFiles = new ArrayList<>(files);
159    Collections.sort(sortedFiles, StoreFileComparators.SEQ_ID);
160    boolean succ = false;
161    try {
162      for (int i = 0, n = sortedFiles.size(); i < n; i++) {
163        scanners.add(
164          sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, true, readPt, i, false));
165      }
166      succ = true;
167    } finally {
168      if (!succ) {
169        for (StoreFileScanner scanner : scanners) {
170          scanner.close();
171        }
172      }
173    }
174    return scanners;
175  }
176
177  @Override
178  public String toString() {
179    return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
180  }
181
182  @Override
183  public Cell peek() {
184    return cur;
185  }
186
187  @Override
188  public Cell next() throws IOException {
189    Cell retKey = cur;
190
191    try {
192      // only seek if we aren't at the end. cur == null implies 'end'.
193      if (cur != null) {
194        hfs.next();
195        setCurrentCell(hfs.getCell());
196        if (hasMVCCInfo || this.reader.isBulkLoaded()) {
197          skipKVsNewerThanReadpoint();
198        }
199      }
200    } catch (FileNotFoundException e) {
201      throw e;
202    } catch (IOException e) {
203      throw new IOException("Could not iterate " + this, e);
204    }
205    return retKey;
206  }
207
208  @Override
209  public boolean seek(Cell key) throws IOException {
210    if (seekCount != null) seekCount.increment();
211
212    try {
213      try {
214        if (!seekAtOrAfter(hfs, key)) {
215          this.cur = null;
216          return false;
217        }
218
219        setCurrentCell(hfs.getCell());
220
221        if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
222          return skipKVsNewerThanReadpoint();
223        } else {
224          return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
225        }
226      } finally {
227        realSeekDone = true;
228      }
229    } catch (FileNotFoundException e) {
230      throw e;
231    } catch (IOException ioe) {
232      throw new IOException("Could not seek " + this + " to key " + key, ioe);
233    }
234  }
235
236  @Override
237  public boolean reseek(Cell key) throws IOException {
238    if (seekCount != null) seekCount.increment();
239
240    try {
241      try {
242        if (!reseekAtOrAfter(hfs, key)) {
243          this.cur = null;
244          return false;
245        }
246        setCurrentCell(hfs.getCell());
247
248        if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
249          return skipKVsNewerThanReadpoint();
250        } else {
251          return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
252        }
253      } finally {
254        realSeekDone = true;
255      }
256    } catch (FileNotFoundException e) {
257      throw e;
258    } catch (IOException ioe) {
259      throw new IOException("Could not reseek " + this + " to key " + key, ioe);
260    }
261  }
262
263  protected void setCurrentCell(Cell newVal) throws IOException {
264    this.cur = newVal;
265    if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
266      PrivateCellUtil.setSequenceId(cur, this.reader.getSequenceID());
267    }
268  }
269
270  protected boolean skipKVsNewerThanReadpoint() throws IOException {
271    // We want to ignore all key-values that are newer than our current
272    // readPoint
273    Cell startKV = cur;
274    while (enforceMVCC && cur != null && (cur.getSequenceId() > readPt)) {
275      boolean hasNext = hfs.next();
276      setCurrentCell(hfs.getCell());
277      if (
278        hasNext && this.stopSkippingKVsIfNextRow && getComparator().compareRows(cur, startKV) > 0
279      ) {
280        return false;
281      }
282    }
283
284    if (cur == null) {
285      return false;
286    }
287
288    return true;
289  }
290
291  @Override
292  public void close() {
293    if (closed) return;
294    cur = null;
295    this.hfs.close();
296    if (this.reader != null) {
297      this.reader.readCompleted();
298    }
299    closed = true;
300  }
301
302  /**
303   * nn * @return false if not found or if k is after the end. n
304   */
305  public static boolean seekAtOrAfter(HFileScanner s, Cell k) throws IOException {
306    int result = s.seekTo(k);
307    if (result < 0) {
308      if (result == HConstants.INDEX_KEY_MAGIC) {
309        // using faked key
310        return true;
311      }
312      // Passed KV is smaller than first KV in file, work from start of file
313      return s.seekTo();
314    } else if (result > 0) {
315      // Passed KV is larger than current KV in file, if there is a next
316      // it is the "after", if not then this scanner is done.
317      return s.next();
318    }
319    // Seeked to the exact key
320    return true;
321  }
322
323  static boolean reseekAtOrAfter(HFileScanner s, Cell k) throws IOException {
324    // This function is similar to seekAtOrAfter function
325    int result = s.reseekTo(k);
326    if (result <= 0) {
327      if (result == HConstants.INDEX_KEY_MAGIC) {
328        // using faked key
329        return true;
330      }
331      // If up to now scanner is not seeked yet, this means passed KV is smaller
332      // than first KV in file, and it is the first time we seek on this file.
333      // So we also need to work from the start of file.
334      if (!s.isSeeked()) {
335        return s.seekTo();
336      }
337      return true;
338    }
339    // passed KV is larger than current KV in file, if there is a next
340    // it is after, if not then this scanner is done.
341    return s.next();
342  }
343
344  /**
345   * @see KeyValueScanner#getScannerOrder()
346   */
347  @Override
348  public long getScannerOrder() {
349    return scannerOrder;
350  }
351
352  /**
353   * Pretend we have done a seek but don't do it yet, if possible. The hope is that we find
354   * requested columns in more recent files and won't have to seek in older files. Creates a fake
355   * key/value with the given row/column and the highest (most recent) possible timestamp we might
356   * get from this file. When users of such "lazy scanner" need to know the next KV precisely (e.g.
357   * when this scanner is at the top of the heap), they run {@link #enforceSeek()}.
358   * <p>
359   * Note that this function does guarantee that the current KV of this scanner will be advanced to
360   * at least the given KV. Because of this, it does have to do a real seek in cases when the seek
361   * timestamp is older than the highest timestamp of the file, e.g. when we are trying to seek to
362   * the next row/column and use OLDEST_TIMESTAMP in the seek key.
363   */
364  @Override
365  public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) throws IOException {
366    if (kv.getFamilyLength() == 0) {
367      useBloom = false;
368    }
369
370    boolean haveToSeek = true;
371    if (useBloom) {
372      // check ROWCOL Bloom filter first.
373      if (reader.getBloomFilterType() == BloomType.ROWCOL) {
374        haveToSeek = reader.passesGeneralRowColBloomFilter(kv);
375      } else if (
376        canOptimizeForNonNullColumn
377          && ((PrivateCellUtil.isDeleteFamily(kv) || PrivateCellUtil.isDeleteFamilyVersion(kv)))
378      ) {
379        // if there is no such delete family kv in the store file,
380        // then no need to seek.
381        haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(), kv.getRowOffset(),
382          kv.getRowLength());
383      }
384    }
385
386    delayedReseek = forward;
387    delayedSeekKV = kv;
388
389    if (haveToSeek) {
390      // This row/column might be in this store file (or we did not use the
391      // Bloom filter), so we still need to seek.
392      realSeekDone = false;
393      long maxTimestampInFile = reader.getMaxTimestamp();
394      long seekTimestamp = kv.getTimestamp();
395      if (seekTimestamp > maxTimestampInFile) {
396        // Create a fake key that is not greater than the real next key.
397        // (Lower timestamps correspond to higher KVs.)
398        // To understand this better, consider that we are asked to seek to
399        // a higher timestamp than the max timestamp in this file. We know that
400        // the next point when we have to consider this file again is when we
401        // pass the max timestamp of this file (with the same row/column).
402        setCurrentCell(PrivateCellUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
403      } else {
404        // This will be the case e.g. when we need to seek to the next
405        // row/column, and we don't know exactly what they are, so we set the
406        // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
407        // row/column.
408        enforceSeek();
409      }
410      return cur != null;
411    }
412
413    // Multi-column Bloom filter optimization.
414    // Create a fake key/value, so that this scanner only bubbles up to the top
415    // of the KeyValueHeap in StoreScanner after we scanned this row/column in
416    // all other store files. The query matcher will then just skip this fake
417    // key/value and the store scanner will progress to the next column. This
418    // is obviously not a "real real" seek, but unlike the fake KV earlier in
419    // this method, we want this to be propagated to ScanQueryMatcher.
420    setCurrentCell(PrivateCellUtil.createLastOnRowCol(kv));
421
422    realSeekDone = true;
423    return true;
424  }
425
426  StoreFileReader getReader() {
427    return reader;
428  }
429
430  CellComparator getComparator() {
431    return reader.getComparator();
432  }
433
434  @Override
435  public boolean realSeekDone() {
436    return realSeekDone;
437  }
438
439  @Override
440  public void enforceSeek() throws IOException {
441    if (realSeekDone) return;
442
443    if (delayedReseek) {
444      reseek(delayedSeekKV);
445    } else {
446      seek(delayedSeekKV);
447    }
448  }
449
450  @Override
451  public boolean isFileScanner() {
452    return true;
453  }
454
455  @Override
456  public Path getFilePath() {
457    return reader.getHFileReader().getPath();
458  }
459
460  // Test methods
461  static final long getSeekCount() {
462    return seekCount.sum();
463  }
464
465  static final void instrument() {
466    seekCount = new LongAdder();
467  }
468
469  @Override
470  public boolean shouldUseScanner(Scan scan, HStore store, long oldestUnexpiredTS) {
471    // if the file has no entries, no need to validate or create a scanner.
472    byte[] cf = store.getColumnFamilyDescriptor().getName();
473    TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
474    if (timeRange == null) {
475      timeRange = scan.getTimeRange();
476    }
477    return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS)
478      && reader.passesKeyRangeFilter(scan)
479      && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf));
480  }
481
482  @Override
483  public boolean seekToPreviousRow(Cell originalKey) throws IOException {
484    try {
485      try {
486        boolean keepSeeking = false;
487        Cell key = originalKey;
488        do {
489          Cell seekKey = PrivateCellUtil.createFirstOnRow(key);
490          if (seekCount != null) seekCount.increment();
491          if (!hfs.seekBefore(seekKey)) {
492            this.cur = null;
493            return false;
494          }
495          Cell curCell = hfs.getCell();
496          Cell firstKeyOfPreviousRow = PrivateCellUtil.createFirstOnRow(curCell);
497
498          if (seekCount != null) seekCount.increment();
499          if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
500            this.cur = null;
501            return false;
502          }
503
504          setCurrentCell(hfs.getCell());
505          this.stopSkippingKVsIfNextRow = true;
506          boolean resultOfSkipKVs;
507          try {
508            resultOfSkipKVs = skipKVsNewerThanReadpoint();
509          } finally {
510            this.stopSkippingKVsIfNextRow = false;
511          }
512          if (!resultOfSkipKVs || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
513            keepSeeking = true;
514            key = firstKeyOfPreviousRow;
515            continue;
516          } else {
517            keepSeeking = false;
518          }
519        } while (keepSeeking);
520        return true;
521      } finally {
522        realSeekDone = true;
523      }
524    } catch (FileNotFoundException e) {
525      throw e;
526    } catch (IOException ioe) {
527      throw new IOException("Could not seekToPreviousRow " + this + " to key " + originalKey, ioe);
528    }
529  }
530
531  @Override
532  public boolean seekToLastRow() throws IOException {
533    Optional<byte[]> lastRow = reader.getLastRowKey();
534    if (!lastRow.isPresent()) {
535      return false;
536    }
537    Cell seekKey = PrivateCellUtil.createFirstOnRow(lastRow.get());
538    if (seek(seekKey)) {
539      return true;
540    } else {
541      return seekToPreviousRow(seekKey);
542    }
543  }
544
545  @Override
546  public boolean backwardSeek(Cell key) throws IOException {
547    seek(key);
548    if (cur == null || getComparator().compareRows(cur, key) > 0) {
549      return seekToPreviousRow(key);
550    }
551    return true;
552  }
553
554  @Override
555  public Cell getNextIndexedKey() {
556    return hfs.getNextIndexedKey();
557  }
558
559  @Override
560  public void shipped() throws IOException {
561    this.hfs.shipped();
562  }
563}