View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.concurrent.atomic.LongAdder;
29
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellComparator;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.KeyValue;
35  import org.apache.hadoop.hbase.KeyValueUtil;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.io.TimeRange;
39  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
40  import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
41
42  /**
43   * KeyValueScanner adaptor over the Reader.  It also provides hooks into
44   * bloom filter things.
45   */
46  @InterfaceAudience.LimitedPrivate("Coprocessor")
47  public class StoreFileScanner implements KeyValueScanner {
48    // the reader it comes from:
49    private final StoreFileReader reader;
50    private final HFileScanner hfs;
51    private Cell cur = null;
52    private boolean closed = false;
53
54    private boolean realSeekDone;
55    private boolean delayedReseek;
56    private Cell delayedSeekKV;
57
58    private final boolean enforceMVCC;
59    private final boolean hasMVCCInfo;
60    // A flag represents whether could stop skipping KeyValues for MVCC
61    // if have encountered the next row. Only used for reversed scan
62    private boolean stopSkippingKVsIfNextRow = false;
63
64    private static LongAdder seekCount;
65
66    private final boolean canOptimizeForNonNullColumn;
67
68    private final long readPt;
69
70    // Order of this scanner relative to other scanners when duplicate key-value is found.
71    // Higher values means scanner has newer data.
72    private final long scannerOrder;
73
74    /**
75     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
76     * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}.
77     * @param readPt MVCC value to use to filter out the updates newer than this scanner.
78     * @param hasMVCC Set to true if underlying store file reader has MVCC info.
79     * @param scannerOrder Order of the scanner relative to other scanners. See
80     *          {@link KeyValueScanner#getScannerOrder()}.
81     * @param canOptimizeForNonNullColumn {@code true} if we can make sure there is no null column,
82     *          otherwise {@code false}. This is a hint for optimization.
83     */
84    public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC,
85        boolean hasMVCC, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) {
86      this.readPt = readPt;
87      this.reader = reader;
88      this.hfs = hfs;
89      this.enforceMVCC = useMVCC;
90      this.hasMVCCInfo = hasMVCC;
91      this.scannerOrder = scannerOrder;
92      this.canOptimizeForNonNullColumn = canOptimizeForNonNullColumn;
93    }
94
95    boolean isPrimaryReplica() {
96      return reader.isPrimaryReplicaReader();
97    }
98
99    /**
100    * Return an array of scanners corresponding to the given
101    * set of store files.
102    */
103   public static List<StoreFileScanner> getScannersForStoreFiles(
104       Collection<StoreFile> files,
105       boolean cacheBlocks,
106       boolean usePread, long readPt) throws IOException {
107     return getScannersForStoreFiles(files, cacheBlocks,
108                                    usePread, false, false, readPt);
109   }
110
111   /**
112    * Return an array of scanners corresponding to the given set of store files.
113    */
114   public static List<StoreFileScanner> getScannersForStoreFiles(
115       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
116       boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
117     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
118         useDropBehind, null, readPt);
119   }
120
121   /**
122    * Return an array of scanners corresponding to the given set of store files, And set the
123    * ScanQueryMatcher for each store file scanner for further optimization
124    */
125   public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files,
126       boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop,
127       ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
128     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(files.size());
129     List<StoreFile> sorted_files = new ArrayList<>(files);
130     Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID);
131     for (int i = 0; i < sorted_files.size(); i++) {
132       StoreFileReader r = sorted_files.get(i).createReader();
133       r.setReplicaStoreFile(isPrimaryReplica);
134       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt,
135         i, matcher != null ? !matcher.hasNullColumnInQuery() : false);
136       scanners.add(scanner);
137     }
138     return scanners;
139   }
140
141   public static List<StoreFileScanner> getScannersForStoreFiles(
142     Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
143     boolean isCompaction, boolean canUseDrop,
144     ScanQueryMatcher matcher, long readPt) throws IOException {
145     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop,
146       matcher, readPt, true);
147   }
148
149   public String toString() {
150     return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
151   }
152
153   public Cell peek() {
154     return cur;
155   }
156
157   public Cell next() throws IOException {
158     Cell retKey = cur;
159
160     try {
161       // only seek if we aren't at the end. cur == null implies 'end'.
162       if (cur != null) {
163         hfs.next();
164         setCurrentCell(hfs.getCell());
165         if (hasMVCCInfo || this.reader.isBulkLoaded()) {
166           skipKVsNewerThanReadpoint();
167         }
168       }
169     } catch (FileNotFoundException e) {
170       throw e;
171     } catch(IOException e) {
172       throw new IOException("Could not iterate " + this, e);
173     }
174     return retKey;
175   }
176
177   public boolean seek(Cell key) throws IOException {
178     if (seekCount != null) seekCount.increment();
179
180     try {
181       try {
182         if(!seekAtOrAfter(hfs, key)) {
183           this.cur = null;
184           return false;
185         }
186
187         setCurrentCell(hfs.getCell());
188
189         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
190           return skipKVsNewerThanReadpoint();
191         } else {
192           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
193         }
194       } finally {
195         realSeekDone = true;
196       }
197     } catch (FileNotFoundException e) {
198       throw e;
199     } catch (IOException ioe) {
200       throw new IOException("Could not seek " + this + " to key " + key, ioe);
201     }
202   }
203
204   public boolean reseek(Cell key) throws IOException {
205     if (seekCount != null) seekCount.increment();
206
207     try {
208       try {
209         if (!reseekAtOrAfter(hfs, key)) {
210           this.cur = null;
211           return false;
212         }
213         setCurrentCell(hfs.getCell());
214
215         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
216           return skipKVsNewerThanReadpoint();
217         } else {
218           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
219         }
220       } finally {
221         realSeekDone = true;
222       }
223     } catch (FileNotFoundException e) {
224       throw e;
225     } catch (IOException ioe) {
226       throw new IOException("Could not reseek " + this + " to key " + key,
227           ioe);
228     }
229   }
230
231   protected void setCurrentCell(Cell newVal) throws IOException {
232     this.cur = newVal;
233     if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
234       CellUtil.setSequenceId(cur, this.reader.getSequenceID());
235     }
236   }
237
238   protected boolean skipKVsNewerThanReadpoint() throws IOException {
239     // We want to ignore all key-values that are newer than our current
240     // readPoint
241     Cell startKV = cur;
242     while(enforceMVCC
243         && cur != null
244         && (cur.getSequenceId() > readPt)) {
245       boolean hasNext = hfs.next();
246       setCurrentCell(hfs.getCell());
247       if (hasNext && this.stopSkippingKVsIfNextRow
248           && getComparator().compareRows(cur, startKV) > 0) {
249         return false;
250       }
251     }
252
253     if (cur == null) {
254       return false;
255     }
256
257     return true;
258   }
259
260   public void close() {
261     if (closed) return;
262     cur = null;
263     this.hfs.close();
264     if (this.reader != null) {
265       this.reader.decrementRefCount();
266     }
267     closed = true;
268   }
269
270   /**
271    *
272    * @param s
273    * @param k
274    * @return false if not found or if k is after the end.
275    * @throws IOException
276    */
277   public static boolean seekAtOrAfter(HFileScanner s, Cell k)
278   throws IOException {
279     int result = s.seekTo(k);
280     if(result < 0) {
281       if (result == HConstants.INDEX_KEY_MAGIC) {
282         // using faked key
283         return true;
284       }
285       // Passed KV is smaller than first KV in file, work from start of file
286       return s.seekTo();
287     } else if(result > 0) {
288       // Passed KV is larger than current KV in file, if there is a next
289       // it is the "after", if not then this scanner is done.
290       return s.next();
291     }
292     // Seeked to the exact key
293     return true;
294   }
295
296   static boolean reseekAtOrAfter(HFileScanner s, Cell k)
297   throws IOException {
298     //This function is similar to seekAtOrAfter function
299     int result = s.reseekTo(k);
300     if (result <= 0) {
301       if (result == HConstants.INDEX_KEY_MAGIC) {
302         // using faked key
303         return true;
304       }
305       // If up to now scanner is not seeked yet, this means passed KV is smaller
306       // than first KV in file, and it is the first time we seek on this file.
307       // So we also need to work from the start of file.
308       if (!s.isSeeked()) {
309         return  s.seekTo();
310       }
311       return true;
312     }
313     // passed KV is larger than current KV in file, if there is a next
314     // it is after, if not then this scanner is done.
315     return s.next();
316   }
317
318   /**
319    * @see KeyValueScanner#getScannerOrder()
320    */
321   @Override
322   public long getScannerOrder() {
323     return scannerOrder;
324   }
325
326   /**
327    * Pretend we have done a seek but don't do it yet, if possible. The hope is
328    * that we find requested columns in more recent files and won't have to seek
329    * in older files. Creates a fake key/value with the given row/column and the
330    * highest (most recent) possible timestamp we might get from this file. When
331    * users of such "lazy scanner" need to know the next KV precisely (e.g. when
332    * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
333    * <p>
334    * Note that this function does guarantee that the current KV of this scanner
335    * will be advanced to at least the given KV. Because of this, it does have
336    * to do a real seek in cases when the seek timestamp is older than the
337    * highest timestamp of the file, e.g. when we are trying to seek to the next
338    * row/column and use OLDEST_TIMESTAMP in the seek key.
339    */
340   @Override
341   public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
342       throws IOException {
343     if (kv.getFamilyLength() == 0) {
344       useBloom = false;
345     }
346
347     boolean haveToSeek = true;
348     if (useBloom) {
349       // check ROWCOL Bloom filter first.
350       if (reader.getBloomFilterType() == BloomType.ROWCOL) {
351         haveToSeek = reader.passesGeneralRowColBloomFilter(kv);
352       } else if (canOptimizeForNonNullColumn
353           && ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
354         // if there is no such delete family kv in the store file,
355         // then no need to seek.
356         haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(), kv.getRowOffset(),
357           kv.getRowLength());
358       }
359     }
360
361     delayedReseek = forward;
362     delayedSeekKV = kv;
363
364     if (haveToSeek) {
365       // This row/column might be in this store file (or we did not use the
366       // Bloom filter), so we still need to seek.
367       realSeekDone = false;
368       long maxTimestampInFile = reader.getMaxTimestamp();
369       long seekTimestamp = kv.getTimestamp();
370       if (seekTimestamp > maxTimestampInFile) {
371         // Create a fake key that is not greater than the real next key.
372         // (Lower timestamps correspond to higher KVs.)
373         // To understand this better, consider that we are asked to seek to
374         // a higher timestamp than the max timestamp in this file. We know that
375         // the next point when we have to consider this file again is when we
376         // pass the max timestamp of this file (with the same row/column).
377         setCurrentCell(CellUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
378       } else {
379         // This will be the case e.g. when we need to seek to the next
380         // row/column, and we don't know exactly what they are, so we set the
381         // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
382         // row/column.
383         enforceSeek();
384       }
385       return cur != null;
386     }
387
388     // Multi-column Bloom filter optimization.
389     // Create a fake key/value, so that this scanner only bubbles up to the top
390     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
391     // all other store files. The query matcher will then just skip this fake
392     // key/value and the store scanner will progress to the next column. This
393     // is obviously not a "real real" seek, but unlike the fake KV earlier in
394     // this method, we want this to be propagated to ScanQueryMatcher.
395     setCurrentCell(CellUtil.createLastOnRowCol(kv));
396
397     realSeekDone = true;
398     return true;
399   }
400
401   StoreFileReader getReader() {
402     return reader;
403   }
404
405   CellComparator getComparator() {
406     return reader.getComparator();
407   }
408
409   @Override
410   public boolean realSeekDone() {
411     return realSeekDone;
412   }
413
414   @Override
415   public void enforceSeek() throws IOException {
416     if (realSeekDone)
417       return;
418
419     if (delayedReseek) {
420       reseek(delayedSeekKV);
421     } else {
422       seek(delayedSeekKV);
423     }
424   }
425
426   @Override
427   public boolean isFileScanner() {
428     return true;
429   }
430
431   // Test methods
432   static final long getSeekCount() {
433     return seekCount.sum();
434   }
435
436   static final void instrument() {
437     seekCount = new LongAdder();
438   }
439
440   @Override
441   public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
442     // if the file has no entries, no need to validate or create a scanner.
443     byte[] cf = store.getFamily().getName();
444     TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
445     if (timeRange == null) {
446       timeRange = scan.getTimeRange();
447     }
448     return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader
449         .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf));
450   }
451
452   @Override
453   public boolean seekToPreviousRow(Cell originalKey) throws IOException {
454     try {
455       try {
456         boolean keepSeeking = false;
457         Cell key = originalKey;
458         do {
459           Cell seekKey = CellUtil.createFirstOnRow(key);
460           if (seekCount != null) seekCount.increment();
461           if (!hfs.seekBefore(seekKey)) {
462             this.cur = null;
463             return false;
464           }
465           Cell curCell = hfs.getCell();
466           Cell firstKeyOfPreviousRow = CellUtil.createFirstOnRow(curCell);
467
468           if (seekCount != null) seekCount.increment();
469           if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
470             this.cur = null;
471             return false;
472           }
473
474           setCurrentCell(hfs.getCell());
475           this.stopSkippingKVsIfNextRow = true;
476           boolean resultOfSkipKVs;
477           try {
478             resultOfSkipKVs = skipKVsNewerThanReadpoint();
479           } finally {
480             this.stopSkippingKVsIfNextRow = false;
481           }
482           if (!resultOfSkipKVs
483               || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
484             keepSeeking = true;
485             key = firstKeyOfPreviousRow;
486             continue;
487           } else {
488             keepSeeking = false;
489           }
490         } while (keepSeeking);
491         return true;
492       } finally {
493         realSeekDone = true;
494       }
495     } catch (FileNotFoundException e) {
496       throw e;
497     } catch (IOException ioe) {
498       throw new IOException("Could not seekToPreviousRow " + this + " to key "
499           + originalKey, ioe);
500     }
501   }
502
503   @Override
504   public boolean seekToLastRow() throws IOException {
505     byte[] lastRow = reader.getLastRowKey();
506     if (lastRow == null) {
507       return false;
508     }
509     Cell seekKey = CellUtil
510         .createFirstOnRow(lastRow, 0, (short) lastRow.length);
511     if (seek(seekKey)) {
512       return true;
513     } else {
514       return seekToPreviousRow(seekKey);
515     }
516   }
517
518   @Override
519   public boolean backwardSeek(Cell key) throws IOException {
520     seek(key);
521     if (cur == null
522         || getComparator().compareRows(cur, key) > 0) {
523       return seekToPreviousRow(key);
524     }
525     return true;
526   }
527
528   @Override
529   public Cell getNextIndexedKey() {
530     return hfs.getNextIndexedKey();
531   }
532
533   @Override
534   public void shipped() throws IOException {
535     this.hfs.shipped();
536   }
537 }