001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.List;
028import java.util.Optional;
029import java.util.PriorityQueue;
030import java.util.concurrent.atomic.LongAdder;
031
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.HBaseInterfaceAudience;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.PrivateCellUtil;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.yetus.audience.InterfaceStability;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.io.TimeRange;
042import org.apache.hadoop.hbase.io.hfile.HFileScanner;
043import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
044
045/**
046 * KeyValueScanner adaptor over the Reader.  It also provides hooks into
047 * bloom filter things.
048 */
049@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
050@InterfaceStability.Evolving
051public class StoreFileScanner implements KeyValueScanner {
052  // the reader it comes from:
053  private final StoreFileReader reader;
054  private final HFileScanner hfs;
055  private Cell cur = null;
056  private boolean closed = false;
057
058  private boolean realSeekDone;
059  private boolean delayedReseek;
060  private Cell delayedSeekKV;
061
062  private final boolean enforceMVCC;
063  private final boolean hasMVCCInfo;
064  // A flag represents whether could stop skipping KeyValues for MVCC
065  // if have encountered the next row. Only used for reversed scan
066  private boolean stopSkippingKVsIfNextRow = false;
067
068  private static LongAdder seekCount;
069
070  private final boolean canOptimizeForNonNullColumn;
071
072  private final long readPt;
073
074  // Order of this scanner relative to other scanners when duplicate key-value is found.
075  // Higher values means scanner has newer data.
076  private final long scannerOrder;
077
078  /**
079   * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
080   * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}.
081   * @param readPt MVCC value to use to filter out the updates newer than this scanner.
082   * @param hasMVCC Set to true if underlying store file reader has MVCC info.
083   * @param scannerOrder Order of the scanner relative to other scanners. See
084   *          {@link KeyValueScanner#getScannerOrder()}.
085   * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column,
086   *          otherwise {@code false}. This is a hint for optimization.
087   */
088  public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC,
089      boolean hasMVCC, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) {
090    this.readPt = readPt;
091    this.reader = reader;
092    this.hfs = hfs;
093    this.enforceMVCC = useMVCC;
094    this.hasMVCCInfo = hasMVCC;
095    this.scannerOrder = scannerOrder;
096    this.canOptimizeForNonNullColumn = canOptimizeForNonNullColumn;
097    this.reader.incrementRefCount();
098  }
099
100  /**
101   * Return an array of scanners corresponding to the given set of store files.
102   */
103  public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files,
104      boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean useDropBehind,
105      long readPt) throws IOException {
106    return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, useDropBehind, null,
107      readPt);
108  }
109
110  /**
111   * Return an array of scanners corresponding to the given set of store files, And set the
112   * ScanQueryMatcher for each store file scanner for further optimization
113   */
114  public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files,
115      boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop,
116      ScanQueryMatcher matcher, long readPt) throws IOException {
117    if (files.isEmpty()) {
118      return Collections.emptyList();
119    }
120    List<StoreFileScanner> scanners = new ArrayList<>(files.size());
121    boolean canOptimizeForNonNullColumn = matcher != null ? !matcher.hasNullColumnInQuery() : false;
122    PriorityQueue<HStoreFile> sortedFiles =
123        new PriorityQueue<>(files.size(), StoreFileComparators.SEQ_ID);
124    for (HStoreFile file : files) {
125      // The sort function needs metadata so we need to open reader first before sorting the list.
126      file.initReader();
127      sortedFiles.add(file);
128    }
129    boolean succ = false;
130    try {
131      for (int i = 0, n = files.size(); i < n; i++) {
132        HStoreFile sf = sortedFiles.remove();
133        StoreFileScanner scanner;
134        if (usePread) {
135          scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn);
136        } else {
137          scanner = sf.getStreamScanner(canUseDrop, cacheBlocks, isCompaction, readPt, i,
138              canOptimizeForNonNullColumn);
139        }
140        scanners.add(scanner);
141      }
142      succ = true;
143    } finally {
144      if (!succ) {
145        for (StoreFileScanner scanner : scanners) {
146          scanner.close();
147        }
148      }
149    }
150    return scanners;
151  }
152
153  /**
154   * Get scanners for compaction. We will create a separated reader for each store file to avoid
155   * contention with normal read request.
156   */
157  public static List<StoreFileScanner> getScannersForCompaction(Collection<HStoreFile> files,
158      boolean canUseDropBehind, long readPt) throws IOException {
159    List<StoreFileScanner> scanners = new ArrayList<>(files.size());
160    List<HStoreFile> sortedFiles = new ArrayList<>(files);
161    Collections.sort(sortedFiles, StoreFileComparators.SEQ_ID);
162    boolean succ = false;
163    try {
164      for (int i = 0, n = sortedFiles.size(); i < n; i++) {
165        scanners.add(
166          sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, true, readPt, i, false));
167      }
168      succ = true;
169    } finally {
170      if (!succ) {
171        for (StoreFileScanner scanner : scanners) {
172          scanner.close();
173        }
174      }
175    }
176    return scanners;
177  }
178
179  @Override
180  public String toString() {
181    return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
182  }
183
184  @Override
185  public Cell peek() {
186    return cur;
187  }
188
189  @Override
190  public Cell next() throws IOException {
191    Cell retKey = cur;
192
193    try {
194      // only seek if we aren't at the end. cur == null implies 'end'.
195      if (cur != null) {
196        hfs.next();
197        setCurrentCell(hfs.getCell());
198        if (hasMVCCInfo || this.reader.isBulkLoaded()) {
199          skipKVsNewerThanReadpoint();
200        }
201      }
202    } catch (FileNotFoundException e) {
203      throw e;
204    } catch(IOException e) {
205      throw new IOException("Could not iterate " + this, e);
206    }
207    return retKey;
208  }
209
210  @Override
211  public boolean seek(Cell key) throws IOException {
212    if (seekCount != null) seekCount.increment();
213
214    try {
215      try {
216        if(!seekAtOrAfter(hfs, key)) {
217          this.cur = null;
218          return false;
219        }
220
221        setCurrentCell(hfs.getCell());
222
223        if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
224          return skipKVsNewerThanReadpoint();
225        } else {
226          return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
227        }
228      } finally {
229        realSeekDone = true;
230      }
231    } catch (FileNotFoundException e) {
232      throw e;
233    } catch (IOException ioe) {
234      throw new IOException("Could not seek " + this + " to key " + key, ioe);
235    }
236  }
237
238  @Override
239  public boolean reseek(Cell key) throws IOException {
240    if (seekCount != null) seekCount.increment();
241
242    try {
243      try {
244        if (!reseekAtOrAfter(hfs, key)) {
245          this.cur = null;
246          return false;
247        }
248        setCurrentCell(hfs.getCell());
249
250        if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
251          return skipKVsNewerThanReadpoint();
252        } else {
253          return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
254        }
255      } finally {
256        realSeekDone = true;
257      }
258    } catch (FileNotFoundException e) {
259      throw e;
260    } catch (IOException ioe) {
261      throw new IOException("Could not reseek " + this + " to key " + key,
262          ioe);
263    }
264  }
265
266  protected void setCurrentCell(Cell newVal) throws IOException {
267    this.cur = newVal;
268    if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
269      PrivateCellUtil.setSequenceId(cur, this.reader.getSequenceID());
270    }
271  }
272
273  protected boolean skipKVsNewerThanReadpoint() throws IOException {
274    // We want to ignore all key-values that are newer than our current
275    // readPoint
276    Cell startKV = cur;
277    while(enforceMVCC
278        && cur != null
279        && (cur.getSequenceId() > readPt)) {
280      boolean hasNext = hfs.next();
281      setCurrentCell(hfs.getCell());
282      if (hasNext && this.stopSkippingKVsIfNextRow
283          && getComparator().compareRows(cur, startKV) > 0) {
284        return false;
285      }
286    }
287
288    if (cur == null) {
289      return false;
290    }
291
292    return true;
293  }
294
295  @Override
296  public void close() {
297    if (closed) return;
298    cur = null;
299    this.hfs.close();
300    if (this.reader != null) {
301      this.reader.readCompleted();
302    }
303    closed = true;
304  }
305
306  /**
307   *
308   * @param s
309   * @param k
310   * @return false if not found or if k is after the end.
311   * @throws IOException
312   */
313  public static boolean seekAtOrAfter(HFileScanner s, Cell k)
314  throws IOException {
315    int result = s.seekTo(k);
316    if(result < 0) {
317      if (result == HConstants.INDEX_KEY_MAGIC) {
318        // using faked key
319        return true;
320      }
321      // Passed KV is smaller than first KV in file, work from start of file
322      return s.seekTo();
323    } else if(result > 0) {
324      // Passed KV is larger than current KV in file, if there is a next
325      // it is the "after", if not then this scanner is done.
326      return s.next();
327    }
328    // Seeked to the exact key
329    return true;
330  }
331
332  static boolean reseekAtOrAfter(HFileScanner s, Cell k)
333  throws IOException {
334    //This function is similar to seekAtOrAfter function
335    int result = s.reseekTo(k);
336    if (result <= 0) {
337      if (result == HConstants.INDEX_KEY_MAGIC) {
338        // using faked key
339        return true;
340      }
341      // If up to now scanner is not seeked yet, this means passed KV is smaller
342      // than first KV in file, and it is the first time we seek on this file.
343      // So we also need to work from the start of file.
344      if (!s.isSeeked()) {
345        return  s.seekTo();
346      }
347      return true;
348    }
349    // passed KV is larger than current KV in file, if there is a next
350    // it is after, if not then this scanner is done.
351    return s.next();
352  }
353
354  /**
355   * @see KeyValueScanner#getScannerOrder()
356   */
357  @Override
358  public long getScannerOrder() {
359    return scannerOrder;
360  }
361
362  /**
363   * Pretend we have done a seek but don't do it yet, if possible. The hope is
364   * that we find requested columns in more recent files and won't have to seek
365   * in older files. Creates a fake key/value with the given row/column and the
366   * highest (most recent) possible timestamp we might get from this file. When
367   * users of such "lazy scanner" need to know the next KV precisely (e.g. when
368   * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
369   * <p>
370   * Note that this function does guarantee that the current KV of this scanner
371   * will be advanced to at least the given KV. Because of this, it does have
372   * to do a real seek in cases when the seek timestamp is older than the
373   * highest timestamp of the file, e.g. when we are trying to seek to the next
374   * row/column and use OLDEST_TIMESTAMP in the seek key.
375   */
376  @Override
377  public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
378      throws IOException {
379    if (kv.getFamilyLength() == 0) {
380      useBloom = false;
381    }
382
383    boolean haveToSeek = true;
384    if (useBloom) {
385      // check ROWCOL Bloom filter first.
386      if (reader.getBloomFilterType() == BloomType.ROWCOL) {
387        haveToSeek = reader.passesGeneralRowColBloomFilter(kv);
388      } else if (canOptimizeForNonNullColumn
389          && ((PrivateCellUtil.isDeleteFamily(kv)
390              || PrivateCellUtil.isDeleteFamilyVersion(kv)))) {
391        // if there is no such delete family kv in the store file,
392        // then no need to seek.
393        haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(), kv.getRowOffset(),
394          kv.getRowLength());
395      }
396    }
397
398    delayedReseek = forward;
399    delayedSeekKV = kv;
400
401    if (haveToSeek) {
402      // This row/column might be in this store file (or we did not use the
403      // Bloom filter), so we still need to seek.
404      realSeekDone = false;
405      long maxTimestampInFile = reader.getMaxTimestamp();
406      long seekTimestamp = kv.getTimestamp();
407      if (seekTimestamp > maxTimestampInFile) {
408        // Create a fake key that is not greater than the real next key.
409        // (Lower timestamps correspond to higher KVs.)
410        // To understand this better, consider that we are asked to seek to
411        // a higher timestamp than the max timestamp in this file. We know that
412        // the next point when we have to consider this file again is when we
413        // pass the max timestamp of this file (with the same row/column).
414        setCurrentCell(PrivateCellUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
415      } else {
416        // This will be the case e.g. when we need to seek to the next
417        // row/column, and we don't know exactly what they are, so we set the
418        // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
419        // row/column.
420        enforceSeek();
421      }
422      return cur != null;
423    }
424
425    // Multi-column Bloom filter optimization.
426    // Create a fake key/value, so that this scanner only bubbles up to the top
427    // of the KeyValueHeap in StoreScanner after we scanned this row/column in
428    // all other store files. The query matcher will then just skip this fake
429    // key/value and the store scanner will progress to the next column. This
430    // is obviously not a "real real" seek, but unlike the fake KV earlier in
431    // this method, we want this to be propagated to ScanQueryMatcher.
432    setCurrentCell(PrivateCellUtil.createLastOnRowCol(kv));
433
434    realSeekDone = true;
435    return true;
436  }
437
438  StoreFileReader getReader() {
439    return reader;
440  }
441
442  CellComparator getComparator() {
443    return reader.getComparator();
444  }
445
446  @Override
447  public boolean realSeekDone() {
448    return realSeekDone;
449  }
450
451  @Override
452  public void enforceSeek() throws IOException {
453    if (realSeekDone)
454      return;
455
456    if (delayedReseek) {
457      reseek(delayedSeekKV);
458    } else {
459      seek(delayedSeekKV);
460    }
461  }
462
463  @Override
464  public boolean isFileScanner() {
465    return true;
466  }
467
468  @Override
469  public Path getFilePath() {
470    return reader.getHFileReader().getPath();
471  }
472
473  // Test methods
474  static final long getSeekCount() {
475    return seekCount.sum();
476  }
477
478  static final void instrument() {
479    seekCount = new LongAdder();
480  }
481
482  @Override
483  public boolean shouldUseScanner(Scan scan, HStore store, long oldestUnexpiredTS) {
484    // if the file has no entries, no need to validate or create a scanner.
485    byte[] cf = store.getColumnFamilyDescriptor().getName();
486    TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
487    if (timeRange == null) {
488      timeRange = scan.getTimeRange();
489    }
490    return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader
491        .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf));
492  }
493
494  @Override
495  public boolean seekToPreviousRow(Cell originalKey) throws IOException {
496    try {
497      try {
498        boolean keepSeeking = false;
499        Cell key = originalKey;
500        do {
501          Cell seekKey = PrivateCellUtil.createFirstOnRow(key);
502          if (seekCount != null) seekCount.increment();
503          if (!hfs.seekBefore(seekKey)) {
504            this.cur = null;
505            return false;
506          }
507          Cell curCell = hfs.getCell();
508          Cell firstKeyOfPreviousRow = PrivateCellUtil.createFirstOnRow(curCell);
509
510          if (seekCount != null) seekCount.increment();
511          if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
512            this.cur = null;
513            return false;
514          }
515
516          setCurrentCell(hfs.getCell());
517          this.stopSkippingKVsIfNextRow = true;
518          boolean resultOfSkipKVs;
519          try {
520            resultOfSkipKVs = skipKVsNewerThanReadpoint();
521          } finally {
522            this.stopSkippingKVsIfNextRow = false;
523          }
524          if (!resultOfSkipKVs
525              || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
526            keepSeeking = true;
527            key = firstKeyOfPreviousRow;
528            continue;
529          } else {
530            keepSeeking = false;
531          }
532        } while (keepSeeking);
533        return true;
534      } finally {
535        realSeekDone = true;
536      }
537    } catch (FileNotFoundException e) {
538      throw e;
539    } catch (IOException ioe) {
540      throw new IOException("Could not seekToPreviousRow " + this + " to key "
541          + originalKey, ioe);
542    }
543  }
544
545  @Override
546  public boolean seekToLastRow() throws IOException {
547    Optional<byte[]> lastRow = reader.getLastRowKey();
548    if (!lastRow.isPresent()) {
549      return false;
550    }
551    Cell seekKey = PrivateCellUtil.createFirstOnRow(lastRow.get());
552    if (seek(seekKey)) {
553      return true;
554    } else {
555      return seekToPreviousRow(seekKey);
556    }
557  }
558
559  @Override
560  public boolean backwardSeek(Cell key) throws IOException {
561    seek(key);
562    if (cur == null
563        || getComparator().compareRows(cur, key) > 0) {
564      return seekToPreviousRow(key);
565    }
566    return true;
567  }
568
569  @Override
570  public Cell getNextIndexedKey() {
571    return hfs.getNextIndexedKey();
572  }
573
574  @Override
575  public void shipped() throws IOException {
576    this.hfs.shipped();
577  }
578}