View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.SortedSet;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellComparator;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
39  import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
40  
41  /**
42   * KeyValueScanner adaptor over the Reader.  It also provides hooks into
43   * bloom filter things.
44   */
45  @InterfaceAudience.LimitedPrivate("Coprocessor")
46  public class StoreFileScanner implements KeyValueScanner {
47    // the reader it comes from:
48    private final StoreFile.Reader reader;
49    private final HFileScanner hfs;
50    private Cell cur = null;
51  
52    private boolean realSeekDone;
53    private boolean delayedReseek;
54    private Cell delayedSeekKV;
55  
56    private boolean enforceMVCC = false;
57    private boolean hasMVCCInfo = false;
58    // A flag represents whether could stop skipping KeyValues for MVCC
59    // if have encountered the next row. Only used for reversed scan
60    private boolean stopSkippingKVsIfNextRow = false;
61  
62    private static AtomicLong seekCount;
63  
64    private ScanQueryMatcher matcher;
65    
66    private long readPt;
67  
68    /**
69     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
70     * @param hfs HFile scanner
71     */
72    public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
73        boolean hasMVCC, long readPt) {
74      this.readPt = readPt;
75      this.reader = reader;
76      this.hfs = hfs;
77      this.enforceMVCC = useMVCC;
78      this.hasMVCCInfo = hasMVCC;
79    }
80  
81    /**
82     * Return an array of scanners corresponding to the given
83     * set of store files.
84     */
85    public static List<StoreFileScanner> getScannersForStoreFiles(
86        Collection<StoreFile> files,
87        boolean cacheBlocks,
88        boolean usePread, long readPt) throws IOException {
89      return getScannersForStoreFiles(files, cacheBlocks,
90                                     usePread, false, readPt);
91    }
92  
93    /**
94     * Return an array of scanners corresponding to the given set of store files.
95     */
96    public static List<StoreFileScanner> getScannersForStoreFiles(
97        Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
98        boolean isCompaction, long readPt) throws IOException {
99      return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
100         null, readPt);
101   }
102 
103   /**
104    * Return an array of scanners corresponding to the given set of store files,
105    * And set the ScanQueryMatcher for each store file scanner for further
106    * optimization
107    */
108   public static List<StoreFileScanner> getScannersForStoreFiles(
109       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
110       boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
111     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
112         files.size());
113     for (StoreFile file : files) {
114       StoreFile.Reader r = file.createReader();
115       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
116           isCompaction, readPt);
117       scanner.setScanQueryMatcher(matcher);
118       scanners.add(scanner);
119     }
120     return scanners;
121   }
122 
123   public String toString() {
124     return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
125   }
126 
127   public Cell peek() {
128     return cur;
129   }
130 
131   public Cell next() throws IOException {
132     Cell retKey = cur;
133 
134     try {
135       // only seek if we aren't at the end. cur == null implies 'end'.
136       if (cur != null) {
137         hfs.next();
138         setCurrentCell(hfs.getCell());
139         if (hasMVCCInfo || this.reader.isBulkLoaded()) {
140           skipKVsNewerThanReadpoint();
141         }
142       }
143     } catch (FileNotFoundException e) {
144       throw e;
145     } catch(IOException e) {
146       throw new IOException("Could not iterate " + this, e);
147     }
148     return retKey;
149   }
150 
151   public boolean seek(Cell key) throws IOException {
152     if (seekCount != null) seekCount.incrementAndGet();
153 
154     try {
155       try {
156         if(!seekAtOrAfter(hfs, key)) {
157           this.cur = null;
158           return false;
159         }
160 
161         setCurrentCell(hfs.getCell());
162 
163         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
164           return skipKVsNewerThanReadpoint();
165         } else {
166           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
167         }
168       } finally {
169         realSeekDone = true;
170       }
171     } catch (FileNotFoundException e) {
172       throw e;
173     } catch (IOException ioe) {
174       throw new IOException("Could not seek " + this + " to key " + key, ioe);
175     }
176   }
177 
178   public boolean reseek(Cell key) throws IOException {
179     if (seekCount != null) seekCount.incrementAndGet();
180 
181     try {
182       try {
183         if (!reseekAtOrAfter(hfs, key)) {
184           this.cur = null;
185           return false;
186         }
187         setCurrentCell(hfs.getCell());
188 
189         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
190           return skipKVsNewerThanReadpoint();
191         } else {
192           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
193         }
194       } finally {
195         realSeekDone = true;
196       }
197     } catch (FileNotFoundException e) {
198       throw e;
199     } catch (IOException ioe) {
200       throw new IOException("Could not reseek " + this + " to key " + key,
201           ioe);
202     }
203   }
204 
205   protected void setCurrentCell(Cell newVal) throws IOException {
206     this.cur = newVal;
207     if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
208       CellUtil.setSequenceId(cur, this.reader.getSequenceID());
209     }
210   }
211 
212   protected boolean skipKVsNewerThanReadpoint() throws IOException {
213     // We want to ignore all key-values that are newer than our current
214     // readPoint
215     Cell startKV = cur;
216     while(enforceMVCC
217         && cur != null
218         && (cur.getSequenceId() > readPt)) {
219       hfs.next();
220       setCurrentCell(hfs.getCell());
221       if (this.stopSkippingKVsIfNextRow
222           && getComparator().compareRows(cur, startKV) > 0) {
223         return false;
224       }
225     }
226 
227     if (cur == null) {
228       return false;
229     }
230 
231     return true;
232   }
233 
234   public void close() {
235     cur = null;
236     this.hfs.close();
237   }
238 
239   /**
240    *
241    * @param s
242    * @param k
243    * @return false if not found or if k is after the end.
244    * @throws IOException
245    */
246   public static boolean seekAtOrAfter(HFileScanner s, Cell k)
247   throws IOException {
248     int result = s.seekTo(k);
249     if(result < 0) {
250       if (result == HConstants.INDEX_KEY_MAGIC) {
251         // using faked key
252         return true;
253       }
254       // Passed KV is smaller than first KV in file, work from start of file
255       return s.seekTo();
256     } else if(result > 0) {
257       // Passed KV is larger than current KV in file, if there is a next
258       // it is the "after", if not then this scanner is done.
259       return s.next();
260     }
261     // Seeked to the exact key
262     return true;
263   }
264 
265   static boolean reseekAtOrAfter(HFileScanner s, Cell k)
266   throws IOException {
267     //This function is similar to seekAtOrAfter function
268     int result = s.reseekTo(k);
269     if (result <= 0) {
270       if (result == HConstants.INDEX_KEY_MAGIC) {
271         // using faked key
272         return true;
273       }
274       // If up to now scanner is not seeked yet, this means passed KV is smaller
275       // than first KV in file, and it is the first time we seek on this file.
276       // So we also need to work from the start of file.
277       if (!s.isSeeked()) {
278         return  s.seekTo();
279       }
280       return true;
281     }
282     // passed KV is larger than current KV in file, if there is a next
283     // it is after, if not then this scanner is done.
284     return s.next();
285   }
286 
287   @Override
288   public long getSequenceID() {
289     return reader.getSequenceID();
290   }
291 
292   /**
293    * Pretend we have done a seek but don't do it yet, if possible. The hope is
294    * that we find requested columns in more recent files and won't have to seek
295    * in older files. Creates a fake key/value with the given row/column and the
296    * highest (most recent) possible timestamp we might get from this file. When
297    * users of such "lazy scanner" need to know the next KV precisely (e.g. when
298    * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
299    * <p>
300    * Note that this function does guarantee that the current KV of this scanner
301    * will be advanced to at least the given KV. Because of this, it does have
302    * to do a real seek in cases when the seek timestamp is older than the
303    * highest timestamp of the file, e.g. when we are trying to seek to the next
304    * row/column and use OLDEST_TIMESTAMP in the seek key.
305    */
306   @Override
307   public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
308       throws IOException {
309     if (kv.getFamilyLength() == 0) {
310       useBloom = false;
311     }
312 
313     boolean haveToSeek = true;
314     if (useBloom) {
315       // check ROWCOL Bloom filter first.
316       if (reader.getBloomFilterType() == BloomType.ROWCOL) {
317         haveToSeek = reader.passesGeneralBloomFilter(kv.getRowArray(),
318             kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
319             kv.getQualifierOffset(), kv.getQualifierLength());
320       } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
321           ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
322         // if there is no such delete family kv in the store file,
323         // then no need to seek.
324         haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
325             kv.getRowOffset(), kv.getRowLength());
326       }
327     }
328 
329     delayedReseek = forward;
330     delayedSeekKV = kv;
331 
332     if (haveToSeek) {
333       // This row/column might be in this store file (or we did not use the
334       // Bloom filter), so we still need to seek.
335       realSeekDone = false;
336       long maxTimestampInFile = reader.getMaxTimestamp();
337       long seekTimestamp = kv.getTimestamp();
338       if (seekTimestamp > maxTimestampInFile) {
339         // Create a fake key that is not greater than the real next key.
340         // (Lower timestamps correspond to higher KVs.)
341         // To understand this better, consider that we are asked to seek to
342         // a higher timestamp than the max timestamp in this file. We know that
343         // the next point when we have to consider this file again is when we
344         // pass the max timestamp of this file (with the same row/column).
345         setCurrentCell(CellUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
346       } else {
347         // This will be the case e.g. when we need to seek to the next
348         // row/column, and we don't know exactly what they are, so we set the
349         // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
350         // row/column.
351         enforceSeek();
352       }
353       return cur != null;
354     }
355 
356     // Multi-column Bloom filter optimization.
357     // Create a fake key/value, so that this scanner only bubbles up to the top
358     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
359     // all other store files. The query matcher will then just skip this fake
360     // key/value and the store scanner will progress to the next column. This
361     // is obviously not a "real real" seek, but unlike the fake KV earlier in
362     // this method, we want this to be propagated to ScanQueryMatcher.
363     setCurrentCell(CellUtil.createLastOnRowCol(kv));
364 
365     realSeekDone = true;
366     return true;
367   }
368 
369   Reader getReader() {
370     return reader;
371   }
372 
373   CellComparator getComparator() {
374     return reader.getComparator();
375   }
376 
377   @Override
378   public boolean realSeekDone() {
379     return realSeekDone;
380   }
381 
382   @Override
383   public void enforceSeek() throws IOException {
384     if (realSeekDone)
385       return;
386 
387     if (delayedReseek) {
388       reseek(delayedSeekKV);
389     } else {
390       seek(delayedSeekKV);
391     }
392   }
393 
394   public void setScanQueryMatcher(ScanQueryMatcher matcher) {
395     this.matcher = matcher;
396   }
397 
398   @Override
399   public boolean isFileScanner() {
400     return true;
401   }
402 
403   // Test methods
404 
405   static final long getSeekCount() {
406     return seekCount.get();
407   }
408   static final void instrument() {
409     seekCount = new AtomicLong();
410   }
411 
412   @Override
413   public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
414     return reader.passesTimerangeFilter(scan, oldestUnexpiredTS)
415         && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
416   }
417 
418   @Override
419   public boolean seekToPreviousRow(Cell key) throws IOException {
420     try {
421       try {
422         Cell seekKey = CellUtil.createFirstOnRow(key);
423         if (seekCount != null) seekCount.incrementAndGet();
424         if (!hfs.seekBefore(seekKey)) {
425           this.cur = null;
426           return false;
427         }
428         Cell curCell = hfs.getCell();
429         Cell firstKeyOfPreviousRow = CellUtil.createFirstOnRow(curCell);
430 
431         if (seekCount != null) seekCount.incrementAndGet();
432         if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
433           this.cur = null;
434           return false;
435         }
436 
437         setCurrentCell(hfs.getCell());
438         this.stopSkippingKVsIfNextRow = true;
439         boolean resultOfSkipKVs;
440         try {
441           resultOfSkipKVs = skipKVsNewerThanReadpoint();
442         } finally {
443           this.stopSkippingKVsIfNextRow = false;
444         }
445         if (!resultOfSkipKVs
446             || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
447           return seekToPreviousRow(firstKeyOfPreviousRow);
448         }
449 
450         return true;
451       } finally {
452         realSeekDone = true;
453       }
454     } catch (FileNotFoundException e) {
455       throw e;
456     } catch (IOException ioe) {
457       throw new IOException("Could not seekToPreviousRow " + this + " to key "
458           + key, ioe);
459     }
460   }
461 
462   @Override
463   public boolean seekToLastRow() throws IOException {
464     byte[] lastRow = reader.getLastRowKey();
465     if (lastRow == null) {
466       return false;
467     }
468     KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
469     if (seek(seekKey)) {
470       return true;
471     } else {
472       return seekToPreviousRow(seekKey);
473     }
474   }
475 
476   @Override
477   public boolean backwardSeek(Cell key) throws IOException {
478     seek(key);
479     if (cur == null
480         || getComparator().compareRows(cur, key) > 0) {
481       return seekToPreviousRow(key);
482     }
483     return true;
484   }
485 
486   @Override
487   public Cell getNextIndexedKey() {
488     return hfs.getNextIndexedKey();
489   }
490 
491   @Override
492   public void shipped() throws IOException {
493     this.hfs.shipped();
494   }
495 }