View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellComparator;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.KeyValue;
35  import org.apache.hadoop.hbase.KeyValueUtil;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.io.TimeRange;
38  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
39  import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
40  
41  /**
42   * KeyValueScanner adaptor over the Reader.  It also provides hooks into
43   * bloom filter things.
44   */
45  @InterfaceAudience.LimitedPrivate("Coprocessor")
46  public class StoreFileScanner implements KeyValueScanner {
47    // the reader it comes from:
48    private final StoreFile.Reader reader;
49    private final HFileScanner hfs;
50    private Cell cur = null;
51    private boolean closed = false;
52  
53    private boolean realSeekDone;
54    private boolean delayedReseek;
55    private Cell delayedSeekKV;
56  
57    private boolean enforceMVCC = false;
58    private boolean hasMVCCInfo = false;
59    // A flag represents whether could stop skipping KeyValues for MVCC
60    // if have encountered the next row. Only used for reversed scan
61    private boolean stopSkippingKVsIfNextRow = false;
62  
63    private static AtomicLong seekCount;
64  
65    private ScanQueryMatcher matcher;
66  
67    private long readPt;
68  
69    /**
70     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
71     * @param hfs HFile scanner
72     */
73    public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
74        boolean hasMVCC, long readPt) {
75      this.readPt = readPt;
76      this.reader = reader;
77      this.hfs = hfs;
78      this.enforceMVCC = useMVCC;
79      this.hasMVCCInfo = hasMVCC;
80    }
81  
82    boolean isPrimaryReplica() {
83      return reader.isPrimaryReplicaReader();
84    }
85  
86    /**
87     * Return an array of scanners corresponding to the given
88     * set of store files.
89     */
90    public static List<StoreFileScanner> getScannersForStoreFiles(
91        Collection<StoreFile> files,
92        boolean cacheBlocks,
93        boolean usePread, long readPt) throws IOException {
94      return getScannersForStoreFiles(files, cacheBlocks,
95                                     usePread, false, false, readPt);
96    }
97  
98    /**
99     * Return an array of scanners corresponding to the given set of store files.
100    */
101   public static List<StoreFileScanner> getScannersForStoreFiles(
102       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
103       boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
104     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
105         useDropBehind, null, readPt);
106   }
107 
108   /**
109    * Return an array of scanners corresponding to the given set of store files,
110    * And set the ScanQueryMatcher for each store file scanner for further
111    * optimization
112    */
113   public static List<StoreFileScanner> getScannersForStoreFiles(
114       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
115       boolean isCompaction, boolean canUseDrop,
116       ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
117     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
118         files.size());
119     for (StoreFile file : files) {
120       StoreFile.Reader r = file.createReader(canUseDrop);
121       r.setReplicaStoreFile(isPrimaryReplica);
122       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
123           isCompaction, readPt);
124       scanner.setScanQueryMatcher(matcher);
125       scanners.add(scanner);
126     }
127     return scanners;
128   }
129 
130   public static List<StoreFileScanner> getScannersForStoreFiles(
131     Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
132     boolean isCompaction, boolean canUseDrop,
133     ScanQueryMatcher matcher, long readPt) throws IOException {
134     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop,
135       matcher, readPt, true);
136   }
137 
138   public String toString() {
139     return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
140   }
141 
142   public Cell peek() {
143     return cur;
144   }
145 
146   public Cell next() throws IOException {
147     Cell retKey = cur;
148 
149     try {
150       // only seek if we aren't at the end. cur == null implies 'end'.
151       if (cur != null) {
152         hfs.next();
153         setCurrentCell(hfs.getCell());
154         if (hasMVCCInfo || this.reader.isBulkLoaded()) {
155           skipKVsNewerThanReadpoint();
156         }
157       }
158     } catch (FileNotFoundException e) {
159       throw e;
160     } catch(IOException e) {
161       throw new IOException("Could not iterate " + this, e);
162     }
163     return retKey;
164   }
165 
166   public boolean seek(Cell key) throws IOException {
167     if (seekCount != null) seekCount.incrementAndGet();
168 
169     try {
170       try {
171         if(!seekAtOrAfter(hfs, key)) {
172           this.cur = null;
173           return false;
174         }
175 
176         setCurrentCell(hfs.getCell());
177 
178         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
179           return skipKVsNewerThanReadpoint();
180         } else {
181           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
182         }
183       } finally {
184         realSeekDone = true;
185       }
186     } catch (FileNotFoundException e) {
187       throw e;
188     } catch (IOException ioe) {
189       throw new IOException("Could not seek " + this + " to key " + key, ioe);
190     }
191   }
192 
193   public boolean reseek(Cell key) throws IOException {
194     if (seekCount != null) seekCount.incrementAndGet();
195 
196     try {
197       try {
198         if (!reseekAtOrAfter(hfs, key)) {
199           this.cur = null;
200           return false;
201         }
202         setCurrentCell(hfs.getCell());
203 
204         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
205           return skipKVsNewerThanReadpoint();
206         } else {
207           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
208         }
209       } finally {
210         realSeekDone = true;
211       }
212     } catch (FileNotFoundException e) {
213       throw e;
214     } catch (IOException ioe) {
215       throw new IOException("Could not reseek " + this + " to key " + key,
216           ioe);
217     }
218   }
219 
220   protected void setCurrentCell(Cell newVal) throws IOException {
221     this.cur = newVal;
222     if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
223       CellUtil.setSequenceId(cur, this.reader.getSequenceID());
224     }
225   }
226 
227   protected boolean skipKVsNewerThanReadpoint() throws IOException {
228     // We want to ignore all key-values that are newer than our current
229     // readPoint
230     Cell startKV = cur;
231     while(enforceMVCC
232         && cur != null
233         && (cur.getSequenceId() > readPt)) {
234       hfs.next();
235       setCurrentCell(hfs.getCell());
236       if (this.stopSkippingKVsIfNextRow
237           && getComparator().compareRows(cur, startKV) > 0) {
238         return false;
239       }
240     }
241 
242     if (cur == null) {
243       return false;
244     }
245 
246     return true;
247   }
248 
249   public void close() {
250     if (closed) return;
251     cur = null;
252     this.hfs.close();
253     if (this.reader != null) {
254       this.reader.decrementRefCount();
255     }
256     closed = true;
257   }
258 
259   /**
260    *
261    * @param s
262    * @param k
263    * @return false if not found or if k is after the end.
264    * @throws IOException
265    */
266   public static boolean seekAtOrAfter(HFileScanner s, Cell k)
267   throws IOException {
268     int result = s.seekTo(k);
269     if(result < 0) {
270       if (result == HConstants.INDEX_KEY_MAGIC) {
271         // using faked key
272         return true;
273       }
274       // Passed KV is smaller than first KV in file, work from start of file
275       return s.seekTo();
276     } else if(result > 0) {
277       // Passed KV is larger than current KV in file, if there is a next
278       // it is the "after", if not then this scanner is done.
279       return s.next();
280     }
281     // Seeked to the exact key
282     return true;
283   }
284 
285   static boolean reseekAtOrAfter(HFileScanner s, Cell k)
286   throws IOException {
287     //This function is similar to seekAtOrAfter function
288     int result = s.reseekTo(k);
289     if (result <= 0) {
290       if (result == HConstants.INDEX_KEY_MAGIC) {
291         // using faked key
292         return true;
293       }
294       // If up to now scanner is not seeked yet, this means passed KV is smaller
295       // than first KV in file, and it is the first time we seek on this file.
296       // So we also need to work from the start of file.
297       if (!s.isSeeked()) {
298         return  s.seekTo();
299       }
300       return true;
301     }
302     // passed KV is larger than current KV in file, if there is a next
303     // it is after, if not then this scanner is done.
304     return s.next();
305   }
306 
307   @Override
308   public long getSequenceID() {
309     return reader.getSequenceID();
310   }
311 
312   /**
313    * Pretend we have done a seek but don't do it yet, if possible. The hope is
314    * that we find requested columns in more recent files and won't have to seek
315    * in older files. Creates a fake key/value with the given row/column and the
316    * highest (most recent) possible timestamp we might get from this file. When
317    * users of such "lazy scanner" need to know the next KV precisely (e.g. when
318    * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
319    * <p>
320    * Note that this function does guarantee that the current KV of this scanner
321    * will be advanced to at least the given KV. Because of this, it does have
322    * to do a real seek in cases when the seek timestamp is older than the
323    * highest timestamp of the file, e.g. when we are trying to seek to the next
324    * row/column and use OLDEST_TIMESTAMP in the seek key.
325    */
326   @Override
327   public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
328       throws IOException {
329     if (kv.getFamilyLength() == 0) {
330       useBloom = false;
331     }
332 
333     boolean haveToSeek = true;
334     if (useBloom) {
335       // check ROWCOL Bloom filter first.
336       if (reader.getBloomFilterType() == BloomType.ROWCOL) {
337         haveToSeek = reader.passesGeneralRowColBloomFilter(kv);
338       } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
339           ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
340         // if there is no such delete family kv in the store file,
341         // then no need to seek.
342         haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
343             kv.getRowOffset(), kv.getRowLength());
344       }
345     }
346 
347     delayedReseek = forward;
348     delayedSeekKV = kv;
349 
350     if (haveToSeek) {
351       // This row/column might be in this store file (or we did not use the
352       // Bloom filter), so we still need to seek.
353       realSeekDone = false;
354       long maxTimestampInFile = reader.getMaxTimestamp();
355       long seekTimestamp = kv.getTimestamp();
356       if (seekTimestamp > maxTimestampInFile) {
357         // Create a fake key that is not greater than the real next key.
358         // (Lower timestamps correspond to higher KVs.)
359         // To understand this better, consider that we are asked to seek to
360         // a higher timestamp than the max timestamp in this file. We know that
361         // the next point when we have to consider this file again is when we
362         // pass the max timestamp of this file (with the same row/column).
363         setCurrentCell(CellUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
364       } else {
365         // This will be the case e.g. when we need to seek to the next
366         // row/column, and we don't know exactly what they are, so we set the
367         // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
368         // row/column.
369         enforceSeek();
370       }
371       return cur != null;
372     }
373 
374     // Multi-column Bloom filter optimization.
375     // Create a fake key/value, so that this scanner only bubbles up to the top
376     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
377     // all other store files. The query matcher will then just skip this fake
378     // key/value and the store scanner will progress to the next column. This
379     // is obviously not a "real real" seek, but unlike the fake KV earlier in
380     // this method, we want this to be propagated to ScanQueryMatcher.
381     setCurrentCell(CellUtil.createLastOnRowCol(kv));
382 
383     realSeekDone = true;
384     return true;
385   }
386 
387   Reader getReader() {
388     return reader;
389   }
390 
391   CellComparator getComparator() {
392     return reader.getComparator();
393   }
394 
395   @Override
396   public boolean realSeekDone() {
397     return realSeekDone;
398   }
399 
400   @Override
401   public void enforceSeek() throws IOException {
402     if (realSeekDone)
403       return;
404 
405     if (delayedReseek) {
406       reseek(delayedSeekKV);
407     } else {
408       seek(delayedSeekKV);
409     }
410   }
411 
412   public void setScanQueryMatcher(ScanQueryMatcher matcher) {
413     this.matcher = matcher;
414   }
415 
416   @Override
417   public boolean isFileScanner() {
418     return true;
419   }
420 
421   // Test methods
422 
423   static final long getSeekCount() {
424     return seekCount.get();
425   }
426   static final void instrument() {
427     seekCount = new AtomicLong();
428   }
429 
430   @Override
431   public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
432     // if the file has no entries, no need to validate or create a scanner.
433     byte[] cf = store.getFamily().getName();
434     TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
435     if (timeRange == null) {
436       timeRange = scan.getTimeRange();
437     }
438     return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader
439         .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf));
440   }
441 
442   @Override
443   public boolean seekToPreviousRow(Cell originalKey) throws IOException {
444     try {
445       try {
446         boolean keepSeeking = false;
447         Cell key = originalKey;
448         do {
449           Cell seekKey = CellUtil.createFirstOnRow(key);
450           if (seekCount != null) seekCount.incrementAndGet();
451           if (!hfs.seekBefore(seekKey)) {
452             this.cur = null;
453             return false;
454           }
455           Cell curCell = hfs.getCell();
456           Cell firstKeyOfPreviousRow = CellUtil.createFirstOnRow(curCell);
457 
458           if (seekCount != null) seekCount.incrementAndGet();
459           if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
460             this.cur = null;
461             return false;
462           }
463 
464           setCurrentCell(hfs.getCell());
465           this.stopSkippingKVsIfNextRow = true;
466           boolean resultOfSkipKVs;
467           try {
468             resultOfSkipKVs = skipKVsNewerThanReadpoint();
469           } finally {
470             this.stopSkippingKVsIfNextRow = false;
471           }
472           if (!resultOfSkipKVs
473               || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
474             keepSeeking = true;
475             key = firstKeyOfPreviousRow;
476             continue;
477           } else {
478             keepSeeking = false;
479           }
480         } while (keepSeeking);
481         return true;
482       } finally {
483         realSeekDone = true;
484       }
485     } catch (FileNotFoundException e) {
486       throw e;
487     } catch (IOException ioe) {
488       throw new IOException("Could not seekToPreviousRow " + this + " to key "
489           + originalKey, ioe);
490     }
491   }
492 
493   @Override
494   public boolean seekToLastRow() throws IOException {
495     byte[] lastRow = reader.getLastRowKey();
496     if (lastRow == null) {
497       return false;
498     }
499     KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
500     if (seek(seekKey)) {
501       return true;
502     } else {
503       return seekToPreviousRow(seekKey);
504     }
505   }
506 
507   @Override
508   public boolean backwardSeek(Cell key) throws IOException {
509     seek(key);
510     if (cur == null
511         || getComparator().compareRows(cur, key) > 0) {
512       return seekToPreviousRow(key);
513     }
514     return true;
515   }
516 
517   @Override
518   public Cell getNextIndexedKey() {
519     return hfs.getNextIndexedKey();
520   }
521 
522   @Override
523   public void shipped() throws IOException {
524     this.hfs.shipped();
525   }
526 }