View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellComparator;
31  import org.apache.hadoop.hbase.CellUtil;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.KeyValueUtil;
35  import org.apache.hadoop.hbase.client.Scan;
36  import org.apache.hadoop.hbase.io.TimeRange;
37  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
38  import org.apache.hadoop.hbase.util.Counter;
39  
40  /**
41   * KeyValueScanner adaptor over the Reader.  It also provides hooks into
42   * bloom filter things.
43   */
44  @InterfaceAudience.LimitedPrivate("Coprocessor")
45  public class StoreFileScanner implements KeyValueScanner {
46    // the reader it comes from:
47    private final StoreFileReader reader;
48    private final HFileScanner hfs;
49    private Cell cur = null;
50    private boolean closed = false;
51  
52    private boolean realSeekDone;
53    private boolean delayedReseek;
54    private Cell delayedSeekKV;
55  
56    private boolean enforceMVCC = false;
57    private boolean hasMVCCInfo = false;
58    // A flag represents whether could stop skipping KeyValues for MVCC
59    // if have encountered the next row. Only used for reversed scan
60    private boolean stopSkippingKVsIfNextRow = false;
61  
62    private static Counter seekCount;
63  
64    private ScanQueryMatcher matcher;
65  
66    private long readPt;
67  
68    /**
69     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
70     * @param hfs HFile scanner
71     */
72    public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC,
73        boolean hasMVCC, long readPt) {
74      this.readPt = readPt;
75      this.reader = reader;
76      this.hfs = hfs;
77      this.enforceMVCC = useMVCC;
78      this.hasMVCCInfo = hasMVCC;
79    }
80  
81    boolean isPrimaryReplica() {
82      return reader.isPrimaryReplicaReader();
83    }
84  
85    /**
86     * Return an array of scanners corresponding to the given
87     * set of store files.
88     */
89    public static List<StoreFileScanner> getScannersForStoreFiles(
90        Collection<StoreFile> files,
91        boolean cacheBlocks,
92        boolean usePread, long readPt) throws IOException {
93      return getScannersForStoreFiles(files, cacheBlocks,
94                                     usePread, false, false, readPt);
95    }
96  
97    /**
98     * Return an array of scanners corresponding to the given set of store files.
99     */
100   public static List<StoreFileScanner> getScannersForStoreFiles(
101       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
102       boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
103     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
104         useDropBehind, null, readPt);
105   }
106 
107   /**
108    * Return an array of scanners corresponding to the given set of store files,
109    * And set the ScanQueryMatcher for each store file scanner for further
110    * optimization
111    */
112   public static List<StoreFileScanner> getScannersForStoreFiles(
113       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
114       boolean isCompaction, boolean canUseDrop,
115       ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
116     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
117         files.size());
118     for (StoreFile file : files) {
119       StoreFileReader r = file.createReader(canUseDrop);
120       r.setReplicaStoreFile(isPrimaryReplica);
121       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
122           isCompaction, readPt);
123       scanner.setScanQueryMatcher(matcher);
124       scanners.add(scanner);
125     }
126     return scanners;
127   }
128 
129   public static List<StoreFileScanner> getScannersForStoreFiles(
130     Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
131     boolean isCompaction, boolean canUseDrop,
132     ScanQueryMatcher matcher, long readPt) throws IOException {
133     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop,
134       matcher, readPt, true);
135   }
136 
137   public String toString() {
138     return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
139   }
140 
141   public Cell peek() {
142     return cur;
143   }
144 
145   public Cell next() throws IOException {
146     Cell retKey = cur;
147 
148     try {
149       // only seek if we aren't at the end. cur == null implies 'end'.
150       if (cur != null) {
151         hfs.next();
152         setCurrentCell(hfs.getCell());
153         if (hasMVCCInfo || this.reader.isBulkLoaded()) {
154           skipKVsNewerThanReadpoint();
155         }
156       }
157     } catch (FileNotFoundException e) {
158       throw e;
159     } catch(IOException e) {
160       throw new IOException("Could not iterate " + this, e);
161     }
162     return retKey;
163   }
164 
165   public boolean seek(Cell key) throws IOException {
166     if (seekCount != null) seekCount.increment();
167 
168     try {
169       try {
170         if(!seekAtOrAfter(hfs, key)) {
171           this.cur = null;
172           return false;
173         }
174 
175         setCurrentCell(hfs.getCell());
176 
177         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
178           return skipKVsNewerThanReadpoint();
179         } else {
180           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
181         }
182       } finally {
183         realSeekDone = true;
184       }
185     } catch (FileNotFoundException e) {
186       throw e;
187     } catch (IOException ioe) {
188       throw new IOException("Could not seek " + this + " to key " + key, ioe);
189     }
190   }
191 
192   public boolean reseek(Cell key) throws IOException {
193     if (seekCount != null) seekCount.increment();
194 
195     try {
196       try {
197         if (!reseekAtOrAfter(hfs, key)) {
198           this.cur = null;
199           return false;
200         }
201         setCurrentCell(hfs.getCell());
202 
203         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
204           return skipKVsNewerThanReadpoint();
205         } else {
206           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
207         }
208       } finally {
209         realSeekDone = true;
210       }
211     } catch (FileNotFoundException e) {
212       throw e;
213     } catch (IOException ioe) {
214       throw new IOException("Could not reseek " + this + " to key " + key,
215           ioe);
216     }
217   }
218 
219   protected void setCurrentCell(Cell newVal) throws IOException {
220     this.cur = newVal;
221     if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
222       CellUtil.setSequenceId(cur, this.reader.getSequenceID());
223     }
224   }
225 
226   protected boolean skipKVsNewerThanReadpoint() throws IOException {
227     // We want to ignore all key-values that are newer than our current
228     // readPoint
229     Cell startKV = cur;
230     while(enforceMVCC
231         && cur != null
232         && (cur.getSequenceId() > readPt)) {
233       hfs.next();
234       setCurrentCell(hfs.getCell());
235       if (this.stopSkippingKVsIfNextRow
236           && getComparator().compareRows(cur, startKV) > 0) {
237         return false;
238       }
239     }
240 
241     if (cur == null) {
242       return false;
243     }
244 
245     return true;
246   }
247 
248   public void close() {
249     if (closed) return;
250     cur = null;
251     this.hfs.close();
252     if (this.reader != null) {
253       this.reader.decrementRefCount();
254     }
255     closed = true;
256   }
257 
258   /**
259    *
260    * @param s
261    * @param k
262    * @return false if not found or if k is after the end.
263    * @throws IOException
264    */
265   public static boolean seekAtOrAfter(HFileScanner s, Cell k)
266   throws IOException {
267     int result = s.seekTo(k);
268     if(result < 0) {
269       if (result == HConstants.INDEX_KEY_MAGIC) {
270         // using faked key
271         return true;
272       }
273       // Passed KV is smaller than first KV in file, work from start of file
274       return s.seekTo();
275     } else if(result > 0) {
276       // Passed KV is larger than current KV in file, if there is a next
277       // it is the "after", if not then this scanner is done.
278       return s.next();
279     }
280     // Seeked to the exact key
281     return true;
282   }
283 
284   static boolean reseekAtOrAfter(HFileScanner s, Cell k)
285   throws IOException {
286     //This function is similar to seekAtOrAfter function
287     int result = s.reseekTo(k);
288     if (result <= 0) {
289       if (result == HConstants.INDEX_KEY_MAGIC) {
290         // using faked key
291         return true;
292       }
293       // If up to now scanner is not seeked yet, this means passed KV is smaller
294       // than first KV in file, and it is the first time we seek on this file.
295       // So we also need to work from the start of file.
296       if (!s.isSeeked()) {
297         return  s.seekTo();
298       }
299       return true;
300     }
301     // passed KV is larger than current KV in file, if there is a next
302     // it is after, if not then this scanner is done.
303     return s.next();
304   }
305 
306   @Override
307   public long getSequenceID() {
308     return reader.getSequenceID();
309   }
310 
311   /**
312    * Pretend we have done a seek but don't do it yet, if possible. The hope is
313    * that we find requested columns in more recent files and won't have to seek
314    * in older files. Creates a fake key/value with the given row/column and the
315    * highest (most recent) possible timestamp we might get from this file. When
316    * users of such "lazy scanner" need to know the next KV precisely (e.g. when
317    * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
318    * <p>
319    * Note that this function does guarantee that the current KV of this scanner
320    * will be advanced to at least the given KV. Because of this, it does have
321    * to do a real seek in cases when the seek timestamp is older than the
322    * highest timestamp of the file, e.g. when we are trying to seek to the next
323    * row/column and use OLDEST_TIMESTAMP in the seek key.
324    */
325   @Override
326   public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
327       throws IOException {
328     if (kv.getFamilyLength() == 0) {
329       useBloom = false;
330     }
331 
332     boolean haveToSeek = true;
333     if (useBloom) {
334       // check ROWCOL Bloom filter first.
335       if (reader.getBloomFilterType() == BloomType.ROWCOL) {
336         haveToSeek = reader.passesGeneralRowColBloomFilter(kv);
337       } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
338           ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
339         // if there is no such delete family kv in the store file,
340         // then no need to seek.
341         haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
342             kv.getRowOffset(), kv.getRowLength());
343       }
344     }
345 
346     delayedReseek = forward;
347     delayedSeekKV = kv;
348 
349     if (haveToSeek) {
350       // This row/column might be in this store file (or we did not use the
351       // Bloom filter), so we still need to seek.
352       realSeekDone = false;
353       long maxTimestampInFile = reader.getMaxTimestamp();
354       long seekTimestamp = kv.getTimestamp();
355       if (seekTimestamp > maxTimestampInFile) {
356         // Create a fake key that is not greater than the real next key.
357         // (Lower timestamps correspond to higher KVs.)
358         // To understand this better, consider that we are asked to seek to
359         // a higher timestamp than the max timestamp in this file. We know that
360         // the next point when we have to consider this file again is when we
361         // pass the max timestamp of this file (with the same row/column).
362         setCurrentCell(CellUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
363       } else {
364         // This will be the case e.g. when we need to seek to the next
365         // row/column, and we don't know exactly what they are, so we set the
366         // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
367         // row/column.
368         enforceSeek();
369       }
370       return cur != null;
371     }
372 
373     // Multi-column Bloom filter optimization.
374     // Create a fake key/value, so that this scanner only bubbles up to the top
375     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
376     // all other store files. The query matcher will then just skip this fake
377     // key/value and the store scanner will progress to the next column. This
378     // is obviously not a "real real" seek, but unlike the fake KV earlier in
379     // this method, we want this to be propagated to ScanQueryMatcher.
380     setCurrentCell(CellUtil.createLastOnRowCol(kv));
381 
382     realSeekDone = true;
383     return true;
384   }
385 
386   StoreFileReader getReader() {
387     return reader;
388   }
389 
390   CellComparator getComparator() {
391     return reader.getComparator();
392   }
393 
394   @Override
395   public boolean realSeekDone() {
396     return realSeekDone;
397   }
398 
399   @Override
400   public void enforceSeek() throws IOException {
401     if (realSeekDone)
402       return;
403 
404     if (delayedReseek) {
405       reseek(delayedSeekKV);
406     } else {
407       seek(delayedSeekKV);
408     }
409   }
410 
411   public void setScanQueryMatcher(ScanQueryMatcher matcher) {
412     this.matcher = matcher;
413   }
414 
415   @Override
416   public boolean isFileScanner() {
417     return true;
418   }
419 
420   // Test methods
421 
422   static final long getSeekCount() {
423     return seekCount.get();
424   }
425   static final void instrument() {
426     seekCount = new Counter();
427   }
428 
429   @Override
430   public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
431     // if the file has no entries, no need to validate or create a scanner.
432     byte[] cf = store.getFamily().getName();
433     TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
434     if (timeRange == null) {
435       timeRange = scan.getTimeRange();
436     }
437     return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader
438         .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf));
439   }
440 
441   @Override
442   public boolean seekToPreviousRow(Cell originalKey) throws IOException {
443     try {
444       try {
445         boolean keepSeeking = false;
446         Cell key = originalKey;
447         do {
448           Cell seekKey = CellUtil.createFirstOnRow(key);
449           if (seekCount != null) seekCount.increment();
450           if (!hfs.seekBefore(seekKey)) {
451             this.cur = null;
452             return false;
453           }
454           Cell curCell = hfs.getCell();
455           Cell firstKeyOfPreviousRow = CellUtil.createFirstOnRow(curCell);
456 
457           if (seekCount != null) seekCount.increment();
458           if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
459             this.cur = null;
460             return false;
461           }
462 
463           setCurrentCell(hfs.getCell());
464           this.stopSkippingKVsIfNextRow = true;
465           boolean resultOfSkipKVs;
466           try {
467             resultOfSkipKVs = skipKVsNewerThanReadpoint();
468           } finally {
469             this.stopSkippingKVsIfNextRow = false;
470           }
471           if (!resultOfSkipKVs
472               || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
473             keepSeeking = true;
474             key = firstKeyOfPreviousRow;
475             continue;
476           } else {
477             keepSeeking = false;
478           }
479         } while (keepSeeking);
480         return true;
481       } finally {
482         realSeekDone = true;
483       }
484     } catch (FileNotFoundException e) {
485       throw e;
486     } catch (IOException ioe) {
487       throw new IOException("Could not seekToPreviousRow " + this + " to key "
488           + originalKey, ioe);
489     }
490   }
491 
492   @Override
493   public boolean seekToLastRow() throws IOException {
494     byte[] lastRow = reader.getLastRowKey();
495     if (lastRow == null) {
496       return false;
497     }
498     KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
499     if (seek(seekKey)) {
500       return true;
501     } else {
502       return seekToPreviousRow(seekKey);
503     }
504   }
505 
506   @Override
507   public boolean backwardSeek(Cell key) throws IOException {
508     seek(key);
509     if (cur == null
510         || getComparator().compareRows(cur, key) > 0) {
511       return seekToPreviousRow(key);
512     }
513     return true;
514   }
515 
516   @Override
517   public Cell getNextIndexedKey() {
518     return hfs.getNextIndexedKey();
519   }
520 
521   @Override
522   public void shipped() throws IOException {
523     this.hfs.shipped();
524   }
525 }