View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.List;
28
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellComparator;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.KeyValue;
35  import org.apache.hadoop.hbase.KeyValueUtil;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.io.TimeRange;
38  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
39  import org.apache.hadoop.hbase.util.Counter;
40
41  /**
42   * KeyValueScanner adaptor over the Reader.  It also provides hooks into
43   * bloom filter things.
44   */
45  @InterfaceAudience.LimitedPrivate("Coprocessor")
46  public class StoreFileScanner implements KeyValueScanner {
47    // the reader it comes from:
48    private final StoreFileReader reader;
49    private final HFileScanner hfs;
50    private Cell cur = null;
51    private boolean closed = false;
52
53    private boolean realSeekDone;
54    private boolean delayedReseek;
55    private Cell delayedSeekKV;
56
57    private boolean enforceMVCC = false;
58    private boolean hasMVCCInfo = false;
59    // A flag represents whether could stop skipping KeyValues for MVCC
60    // if have encountered the next row. Only used for reversed scan
61    private boolean stopSkippingKVsIfNextRow = false;
62
63    private static Counter seekCount;
64
65    private ScanQueryMatcher matcher;
66
67    private long readPt;
68
69    // Order of this scanner relative to other scanners when duplicate key-value is found.
70    // Higher values means scanner has newer data.
71    private long scannerOrder;
72
73    /**
74     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
75     * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}.
76     * @param readPt MVCC value to use to filter out the updates newer than this scanner.
77     * @param hasMVCC Set to true if underlying store file reader has MVCC info.
78     */
79    public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC,
80        boolean hasMVCC, long readPt) {
81      this (reader, hfs, useMVCC, hasMVCC, readPt, 0);
82    }
83
84    /**
85     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
86     * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}.
87     * @param readPt MVCC value to use to filter out the updates newer than this scanner.
88     * @param hasMVCC Set to true if underlying store file reader has MVCC info.
89     * @param scannerOrder Order of the scanner relative to other scanners.
90     *   See {@link KeyValueScanner#getScannerOrder()}.
91     */
92    public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC,
93        boolean hasMVCC, long readPt, long scannerOrder) {
94      this.readPt = readPt;
95      this.reader = reader;
96      this.hfs = hfs;
97      this.enforceMVCC = useMVCC;
98      this.hasMVCCInfo = hasMVCC;
99      this.scannerOrder = scannerOrder;
100   }
101
102   boolean isPrimaryReplica() {
103     return reader.isPrimaryReplicaReader();
104   }
105
106   /**
107    * Return an array of scanners corresponding to the given
108    * set of store files.
109    */
110   public static List<StoreFileScanner> getScannersForStoreFiles(
111       Collection<StoreFile> files,
112       boolean cacheBlocks,
113       boolean usePread, long readPt) throws IOException {
114     return getScannersForStoreFiles(files, cacheBlocks,
115                                    usePread, false, false, readPt);
116   }
117
118   /**
119    * Return an array of scanners corresponding to the given set of store files.
120    */
121   public static List<StoreFileScanner> getScannersForStoreFiles(
122       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
123       boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
124     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
125         useDropBehind, null, readPt);
126   }
127
128   /**
129    * Return an array of scanners corresponding to the given set of store files,
130    * And set the ScanQueryMatcher for each store file scanner for further
131    * optimization
132    */
133   public static List<StoreFileScanner> getScannersForStoreFiles(
134       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
135       boolean isCompaction, boolean canUseDrop,
136       ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
137     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
138         files.size());
139     List<StoreFile> sorted_files = new ArrayList<>(files);
140     Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID);
141     for (int i = 0; i < sorted_files.size(); i++) {
142       StoreFileReader r = sorted_files.get(i).createReader();
143       r.setReplicaStoreFile(isPrimaryReplica);
144       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
145           isCompaction, readPt, i);
146       scanner.setScanQueryMatcher(matcher);
147       scanners.add(scanner);
148     }
149     return scanners;
150   }
151
152   public static List<StoreFileScanner> getScannersForStoreFiles(
153     Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
154     boolean isCompaction, boolean canUseDrop,
155     ScanQueryMatcher matcher, long readPt) throws IOException {
156     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop,
157       matcher, readPt, true);
158   }
159
160   public String toString() {
161     return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
162   }
163
164   public Cell peek() {
165     return cur;
166   }
167 
168   public Cell next() throws IOException {
169     Cell retKey = cur;
170
171     try {
172       // only seek if we aren't at the end. cur == null implies 'end'.
173       if (cur != null) {
174         hfs.next();
175         setCurrentCell(hfs.getCell());
176         if (hasMVCCInfo || this.reader.isBulkLoaded()) {
177           skipKVsNewerThanReadpoint();
178         }
179       }
180     } catch (FileNotFoundException e) {
181       throw e;
182     } catch(IOException e) {
183       throw new IOException("Could not iterate " + this, e);
184     }
185     return retKey;
186   }
187
188   public boolean seek(Cell key) throws IOException {
189     if (seekCount != null) seekCount.increment();
190
191     try {
192       try {
193         if(!seekAtOrAfter(hfs, key)) {
194           this.cur = null;
195           return false;
196         }
197
198         setCurrentCell(hfs.getCell());
199
200         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
201           return skipKVsNewerThanReadpoint();
202         } else {
203           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
204         }
205       } finally {
206         realSeekDone = true;
207       }
208     } catch (FileNotFoundException e) {
209       throw e;
210     } catch (IOException ioe) {
211       throw new IOException("Could not seek " + this + " to key " + key, ioe);
212     }
213   }
214
215   public boolean reseek(Cell key) throws IOException {
216     if (seekCount != null) seekCount.increment();
217
218     try {
219       try {
220         if (!reseekAtOrAfter(hfs, key)) {
221           this.cur = null;
222           return false;
223         }
224         setCurrentCell(hfs.getCell());
225 
226         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
227           return skipKVsNewerThanReadpoint();
228         } else {
229           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
230         }
231       } finally {
232         realSeekDone = true;
233       }
234     } catch (FileNotFoundException e) {
235       throw e;
236     } catch (IOException ioe) {
237       throw new IOException("Could not reseek " + this + " to key " + key,
238           ioe);
239     }
240   }
241
242   protected void setCurrentCell(Cell newVal) throws IOException {
243     this.cur = newVal;
244     if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
245       CellUtil.setSequenceId(cur, this.reader.getSequenceID());
246     }
247   }
248
249   protected boolean skipKVsNewerThanReadpoint() throws IOException {
250     // We want to ignore all key-values that are newer than our current
251     // readPoint
252     Cell startKV = cur;
253     while(enforceMVCC
254         && cur != null
255         && (cur.getSequenceId() > readPt)) {
256       boolean hasNext = hfs.next();
257       setCurrentCell(hfs.getCell());
258       if (hasNext && this.stopSkippingKVsIfNextRow
259           && getComparator().compareRows(cur, startKV) > 0) {
260         return false;
261       }
262     }
263
264     if (cur == null) {
265       return false;
266     }
267
268     return true;
269   }
270
271   public void close() {
272     if (closed) return;
273     cur = null;
274     this.hfs.close();
275     if (this.reader != null) {
276       this.reader.decrementRefCount();
277     }
278     closed = true;
279   }
280
281   /**
282    *
283    * @param s
284    * @param k
285    * @return false if not found or if k is after the end.
286    * @throws IOException
287    */
288   public static boolean seekAtOrAfter(HFileScanner s, Cell k)
289   throws IOException {
290     int result = s.seekTo(k);
291     if(result < 0) {
292       if (result == HConstants.INDEX_KEY_MAGIC) {
293         // using faked key
294         return true;
295       }
296       // Passed KV is smaller than first KV in file, work from start of file
297       return s.seekTo();
298     } else if(result > 0) {
299       // Passed KV is larger than current KV in file, if there is a next
300       // it is the "after", if not then this scanner is done.
301       return s.next();
302     }
303     // Seeked to the exact key
304     return true;
305   }
306
307   static boolean reseekAtOrAfter(HFileScanner s, Cell k)
308   throws IOException {
309     //This function is similar to seekAtOrAfter function
310     int result = s.reseekTo(k);
311     if (result <= 0) {
312       if (result == HConstants.INDEX_KEY_MAGIC) {
313         // using faked key
314         return true;
315       }
316       // If up to now scanner is not seeked yet, this means passed KV is smaller
317       // than first KV in file, and it is the first time we seek on this file.
318       // So we also need to work from the start of file.
319       if (!s.isSeeked()) {
320         return  s.seekTo();
321       }
322       return true;
323     }
324     // passed KV is larger than current KV in file, if there is a next
325     // it is after, if not then this scanner is done.
326     return s.next();
327   }
328
329   /**
330    * @see KeyValueScanner#getScannerOrder()
331    */
332   @Override
333   public long getScannerOrder() {
334     return scannerOrder;
335   }
336
337   /**
338    * Pretend we have done a seek but don't do it yet, if possible. The hope is
339    * that we find requested columns in more recent files and won't have to seek
340    * in older files. Creates a fake key/value with the given row/column and the
341    * highest (most recent) possible timestamp we might get from this file. When
342    * users of such "lazy scanner" need to know the next KV precisely (e.g. when
343    * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
344    * <p>
345    * Note that this function does guarantee that the current KV of this scanner
346    * will be advanced to at least the given KV. Because of this, it does have
347    * to do a real seek in cases when the seek timestamp is older than the
348    * highest timestamp of the file, e.g. when we are trying to seek to the next
349    * row/column and use OLDEST_TIMESTAMP in the seek key.
350    */
351   @Override
352   public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
353       throws IOException {
354     if (kv.getFamilyLength() == 0) {
355       useBloom = false;
356     }
357
358     boolean haveToSeek = true;
359     if (useBloom) {
360       // check ROWCOL Bloom filter first.
361       if (reader.getBloomFilterType() == BloomType.ROWCOL) {
362         haveToSeek = reader.passesGeneralRowColBloomFilter(kv);
363       } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
364           ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
365         // if there is no such delete family kv in the store file,
366         // then no need to seek.
367         haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
368             kv.getRowOffset(), kv.getRowLength());
369       }
370     }
371
372     delayedReseek = forward;
373     delayedSeekKV = kv;
374
375     if (haveToSeek) {
376       // This row/column might be in this store file (or we did not use the
377       // Bloom filter), so we still need to seek.
378       realSeekDone = false;
379       long maxTimestampInFile = reader.getMaxTimestamp();
380       long seekTimestamp = kv.getTimestamp();
381       if (seekTimestamp > maxTimestampInFile) {
382         // Create a fake key that is not greater than the real next key.
383         // (Lower timestamps correspond to higher KVs.)
384         // To understand this better, consider that we are asked to seek to
385         // a higher timestamp than the max timestamp in this file. We know that
386         // the next point when we have to consider this file again is when we
387         // pass the max timestamp of this file (with the same row/column).
388         setCurrentCell(CellUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
389       } else {
390         // This will be the case e.g. when we need to seek to the next
391         // row/column, and we don't know exactly what they are, so we set the
392         // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
393         // row/column.
394         enforceSeek();
395       }
396       return cur != null;
397     }
398 
399     // Multi-column Bloom filter optimization.
400     // Create a fake key/value, so that this scanner only bubbles up to the top
401     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
402     // all other store files. The query matcher will then just skip this fake
403     // key/value and the store scanner will progress to the next column. This
404     // is obviously not a "real real" seek, but unlike the fake KV earlier in
405     // this method, we want this to be propagated to ScanQueryMatcher.
406     setCurrentCell(CellUtil.createLastOnRowCol(kv));
407
408     realSeekDone = true;
409     return true;
410   }
411
412   StoreFileReader getReader() {
413     return reader;
414   }
415
416   CellComparator getComparator() {
417     return reader.getComparator();
418   }
419 
420   @Override
421   public boolean realSeekDone() {
422     return realSeekDone;
423   }
424
425   @Override
426   public void enforceSeek() throws IOException {
427     if (realSeekDone)
428       return;
429
430     if (delayedReseek) {
431       reseek(delayedSeekKV);
432     } else {
433       seek(delayedSeekKV);
434     }
435   }
436
437   public void setScanQueryMatcher(ScanQueryMatcher matcher) {
438     this.matcher = matcher;
439   }
440 
441   @Override
442   public boolean isFileScanner() {
443     return true;
444   }
445
446   // Test methods
447
448   static final long getSeekCount() {
449     return seekCount.get();
450   }
451   static final void instrument() {
452     seekCount = new Counter();
453   }
454
455   @Override
456   public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
457     // if the file has no entries, no need to validate or create a scanner.
458     byte[] cf = store.getFamily().getName();
459     TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
460     if (timeRange == null) {
461       timeRange = scan.getTimeRange();
462     }
463     return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader
464         .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf));
465   }
466
467   @Override
468   public boolean seekToPreviousRow(Cell originalKey) throws IOException {
469     try {
470       try {
471         boolean keepSeeking = false;
472         Cell key = originalKey;
473         do {
474           Cell seekKey = CellUtil.createFirstOnRow(key);
475           if (seekCount != null) seekCount.increment();
476           if (!hfs.seekBefore(seekKey)) {
477             this.cur = null;
478             return false;
479           }
480           Cell curCell = hfs.getCell();
481           Cell firstKeyOfPreviousRow = CellUtil.createFirstOnRow(curCell);
482
483           if (seekCount != null) seekCount.increment();
484           if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
485             this.cur = null;
486             return false;
487           }
488
489           setCurrentCell(hfs.getCell());
490           this.stopSkippingKVsIfNextRow = true;
491           boolean resultOfSkipKVs;
492           try {
493             resultOfSkipKVs = skipKVsNewerThanReadpoint();
494           } finally {
495             this.stopSkippingKVsIfNextRow = false;
496           }
497           if (!resultOfSkipKVs
498               || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
499             keepSeeking = true;
500             key = firstKeyOfPreviousRow;
501             continue;
502           } else {
503             keepSeeking = false;
504           }
505         } while (keepSeeking);
506         return true;
507       } finally {
508         realSeekDone = true;
509       }
510     } catch (FileNotFoundException e) {
511       throw e;
512     } catch (IOException ioe) {
513       throw new IOException("Could not seekToPreviousRow " + this + " to key "
514           + originalKey, ioe);
515     }
516   }
517
518   @Override
519   public boolean seekToLastRow() throws IOException {
520     byte[] lastRow = reader.getLastRowKey();
521     if (lastRow == null) {
522       return false;
523     }
524     KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
525     if (seek(seekKey)) {
526       return true;
527     } else {
528       return seekToPreviousRow(seekKey);
529     }
530   }
531
532   @Override
533   public boolean backwardSeek(Cell key) throws IOException {
534     seek(key);
535     if (cur == null
536         || getComparator().compareRows(cur, key) > 0) {
537       return seekToPreviousRow(key);
538     }
539     return true;
540   }
541
542   @Override
543   public Cell getNextIndexedKey() {
544     return hfs.getNextIndexedKey();
545   }
546
547   @Override
548   public void shipped() throws IOException {
549     this.hfs.shipped();
550   }
551 }