View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.List;
26  import java.util.SortedSet;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
39  import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
40  
41  /**
42   * KeyValueScanner adaptor over the Reader.  It also provides hooks into
43   * bloom filter things.
44   */
45  @InterfaceAudience.LimitedPrivate("Coprocessor")
46  public class StoreFileScanner implements KeyValueScanner {
47    static final Log LOG = LogFactory.getLog(HStore.class);
48  
49    // the reader it comes from:
50    private final StoreFile.Reader reader;
51    private final HFileScanner hfs;
52    private Cell cur = null;
53  
54    private boolean realSeekDone;
55    private boolean delayedReseek;
56    private Cell delayedSeekKV;
57  
58    private boolean enforceMVCC = false;
59    private boolean hasMVCCInfo = false;
60    // A flag represents whether could stop skipping KeyValues for MVCC
61    // if have encountered the next row. Only used for reversed scan
62    private boolean stopSkippingKVsIfNextRow = false;
63  
64    private static AtomicLong seekCount;
65  
66    private ScanQueryMatcher matcher;
67    
68    private long readPt;
69  
70    /**
71     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
72     * @param hfs HFile scanner
73     */
74    public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
75        boolean hasMVCC, long readPt) {
76      this.readPt = readPt;
77      this.reader = reader;
78      this.hfs = hfs;
79      this.enforceMVCC = useMVCC;
80      this.hasMVCCInfo = hasMVCC;
81    }
82  
83    /**
84     * Return an array of scanners corresponding to the given
85     * set of store files.
86     */
87    public static List<StoreFileScanner> getScannersForStoreFiles(
88        Collection<StoreFile> files,
89        boolean cacheBlocks,
90        boolean usePread, long readPt) throws IOException {
91      return getScannersForStoreFiles(files, cacheBlocks,
92                                     usePread, false, readPt);
93    }
94  
95    /**
96     * Return an array of scanners corresponding to the given set of store files.
97     */
98    public static List<StoreFileScanner> getScannersForStoreFiles(
99        Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
100       boolean isCompaction, long readPt) throws IOException {
101     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
102         null, readPt);
103   }
104 
105   /**
106    * Return an array of scanners corresponding to the given set of store files,
107    * And set the ScanQueryMatcher for each store file scanner for further
108    * optimization
109    */
110   public static List<StoreFileScanner> getScannersForStoreFiles(
111       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
112       boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
113     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
114         files.size());
115     for (StoreFile file : files) {
116       StoreFile.Reader r = file.createReader();
117       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
118           isCompaction, readPt);
119       scanner.setScanQueryMatcher(matcher);
120       scanners.add(scanner);
121     }
122     return scanners;
123   }
124 
125   public String toString() {
126     return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
127   }
128 
129   public Cell peek() {
130     return cur;
131   }
132 
133   public Cell next() throws IOException {
134     Cell retKey = cur;
135 
136     try {
137       // only seek if we aren't at the end. cur == null implies 'end'.
138       if (cur != null) {
139         hfs.next();
140         setCurrentCell(hfs.getKeyValue());
141         if (hasMVCCInfo || this.reader.isBulkLoaded()) {
142           skipKVsNewerThanReadpoint();
143         }
144       }
145     } catch(IOException e) {
146       throw new IOException("Could not iterate " + this, e);
147     }
148     return retKey;
149   }
150 
151   public boolean seek(Cell key) throws IOException {
152     if (seekCount != null) seekCount.incrementAndGet();
153 
154     try {
155       try {
156         if(!seekAtOrAfter(hfs, key)) {
157           close();
158           return false;
159         }
160 
161         setCurrentCell(hfs.getKeyValue());
162 
163         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
164           return skipKVsNewerThanReadpoint();
165         } else {
166           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
167         }
168       } finally {
169         realSeekDone = true;
170       }
171     } catch (IOException ioe) {
172       throw new IOException("Could not seek " + this + " to key " + key, ioe);
173     }
174   }
175 
176   public boolean reseek(Cell key) throws IOException {
177     if (seekCount != null) seekCount.incrementAndGet();
178 
179     try {
180       try {
181         if (!reseekAtOrAfter(hfs, key)) {
182           close();
183           return false;
184         }
185         setCurrentCell(hfs.getKeyValue());
186 
187         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
188           return skipKVsNewerThanReadpoint();
189         } else {
190           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
191         }
192       } finally {
193         realSeekDone = true;
194       }
195     } catch (IOException ioe) {
196       throw new IOException("Could not reseek " + this + " to key " + key,
197           ioe);
198     }
199   }
200 
201   protected void setCurrentCell(Cell newVal) throws IOException {
202     this.cur = newVal;
203     if (this.cur != null && this.reader.isBulkLoaded()) {
204       CellUtil.setSequenceId(cur, this.reader.getSequenceID());
205     }
206   }
207 
208   protected boolean skipKVsNewerThanReadpoint() throws IOException {
209     // We want to ignore all key-values that are newer than our current
210     // readPoint
211     Cell startKV = cur;
212     while(enforceMVCC
213         && cur != null
214         && (cur.getMvccVersion() > readPt)) {
215       hfs.next();
216       setCurrentCell(hfs.getKeyValue());
217       if (this.stopSkippingKVsIfNextRow
218           && getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
219               cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(),
220               startKV.getRowLength()) > 0) {
221         return false;
222       }
223     }
224 
225     if (cur == null) {
226       close();
227       return false;
228     }
229 
230     return true;
231   }
232 
233   public void close() {
234     // Nothing to close on HFileScanner?
235     cur = null;
236   }
237 
238   /**
239    *
240    * @param s
241    * @param k
242    * @return false if not found or if k is after the end.
243    * @throws IOException
244    */
245   public static boolean seekAtOrAfter(HFileScanner s, Cell k)
246   throws IOException {
247     int result = s.seekTo(k);
248     if(result < 0) {
249       if (result == HConstants.INDEX_KEY_MAGIC) {
250         // using faked key
251         return true;
252       }
253       // Passed KV is smaller than first KV in file, work from start of file
254       return s.seekTo();
255     } else if(result > 0) {
256       // Passed KV is larger than current KV in file, if there is a next
257       // it is the "after", if not then this scanner is done.
258       return s.next();
259     }
260     // Seeked to the exact key
261     return true;
262   }
263 
264   static boolean reseekAtOrAfter(HFileScanner s, Cell k)
265   throws IOException {
266     //This function is similar to seekAtOrAfter function
267     int result = s.reseekTo(k);
268     if (result <= 0) {
269       if (result == HConstants.INDEX_KEY_MAGIC) {
270         // using faked key
271         return true;
272       }
273       // If up to now scanner is not seeked yet, this means passed KV is smaller
274       // than first KV in file, and it is the first time we seek on this file.
275       // So we also need to work from the start of file.
276       if (!s.isSeeked()) {
277         return  s.seekTo();
278       }
279       return true;
280     }
281     // passed KV is larger than current KV in file, if there is a next
282     // it is after, if not then this scanner is done.
283     return s.next();
284   }
285 
286   @Override
287   public long getSequenceID() {
288     return reader.getSequenceID();
289   }
290 
291   /**
292    * Pretend we have done a seek but don't do it yet, if possible. The hope is
293    * that we find requested columns in more recent files and won't have to seek
294    * in older files. Creates a fake key/value with the given row/column and the
295    * highest (most recent) possible timestamp we might get from this file. When
296    * users of such "lazy scanner" need to know the next KV precisely (e.g. when
297    * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
298    * <p>
299    * Note that this function does guarantee that the current KV of this scanner
300    * will be advanced to at least the given KV. Because of this, it does have
301    * to do a real seek in cases when the seek timestamp is older than the
302    * highest timestamp of the file, e.g. when we are trying to seek to the next
303    * row/column and use OLDEST_TIMESTAMP in the seek key.
304    */
305   @Override
306   public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
307       throws IOException {
308     if (kv.getFamilyLength() == 0) {
309       useBloom = false;
310     }
311 
312     boolean haveToSeek = true;
313     if (useBloom) {
314       // check ROWCOL Bloom filter first.
315       if (reader.getBloomFilterType() == BloomType.ROWCOL) {
316         haveToSeek = reader.passesGeneralBloomFilter(kv.getRowArray(),
317             kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
318             kv.getQualifierOffset(), kv.getQualifierLength());
319       } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
320           ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
321         // if there is no such delete family kv in the store file,
322         // then no need to seek.
323         haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
324             kv.getRowOffset(), kv.getRowLength());
325       }
326     }
327 
328     delayedReseek = forward;
329     delayedSeekKV = kv;
330 
331     if (haveToSeek) {
332       // This row/column might be in this store file (or we did not use the
333       // Bloom filter), so we still need to seek.
334       realSeekDone = false;
335       long maxTimestampInFile = reader.getMaxTimestamp();
336       long seekTimestamp = kv.getTimestamp();
337       if (seekTimestamp > maxTimestampInFile) {
338         // Create a fake key that is not greater than the real next key.
339         // (Lower timestamps correspond to higher KVs.)
340         // To understand this better, consider that we are asked to seek to
341         // a higher timestamp than the max timestamp in this file. We know that
342         // the next point when we have to consider this file again is when we
343         // pass the max timestamp of this file (with the same row/column).
344         setCurrentCell(KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
345       } else {
346         // This will be the case e.g. when we need to seek to the next
347         // row/column, and we don't know exactly what they are, so we set the
348         // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
349         // row/column.
350         enforceSeek();
351       }
352       return cur != null;
353     }
354 
355     // Multi-column Bloom filter optimization.
356     // Create a fake key/value, so that this scanner only bubbles up to the top
357     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
358     // all other store files. The query matcher will then just skip this fake
359     // key/value and the store scanner will progress to the next column. This
360     // is obviously not a "real real" seek, but unlike the fake KV earlier in
361     // this method, we want this to be propagated to ScanQueryMatcher.
362     setCurrentCell(KeyValueUtil.createLastOnRowCol(kv));
363 
364     realSeekDone = true;
365     return true;
366   }
367 
368   Reader getReader() {
369     return reader;
370   }
371 
372   KeyValue.KVComparator getComparator() {
373     return reader.getComparator();
374   }
375 
376   @Override
377   public boolean realSeekDone() {
378     return realSeekDone;
379   }
380 
381   @Override
382   public void enforceSeek() throws IOException {
383     if (realSeekDone)
384       return;
385 
386     if (delayedReseek) {
387       reseek(delayedSeekKV);
388     } else {
389       seek(delayedSeekKV);
390     }
391   }
392 
393   public void setScanQueryMatcher(ScanQueryMatcher matcher) {
394     this.matcher = matcher;
395   }
396 
397   @Override
398   public boolean isFileScanner() {
399     return true;
400   }
401 
402   // Test methods
403 
404   static final long getSeekCount() {
405     return seekCount.get();
406   }
407   static final void instrument() {
408     seekCount = new AtomicLong();
409   }
410 
411   @Override
412   public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
413     return reader.passesTimerangeFilter(scan, oldestUnexpiredTS)
414         && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
415   }
416 
417   @Override
418   @SuppressWarnings("deprecation")
419   public boolean seekToPreviousRow(Cell key) throws IOException {
420     try {
421       try {
422         KeyValue seekKey = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
423             key.getRowLength());
424         if (seekCount != null) seekCount.incrementAndGet();
425         if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
426             seekKey.getKeyLength())) {
427           close();
428           return false;
429         }
430         KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
431             .getRowArray(), hfs.getKeyValue().getRowOffset(), hfs.getKeyValue().getRowLength());
432 
433         if (seekCount != null) seekCount.incrementAndGet();
434         if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
435           close();
436           return false;
437         }
438 
439         setCurrentCell(hfs.getKeyValue());
440         this.stopSkippingKVsIfNextRow = true;
441         boolean resultOfSkipKVs;
442         try {
443           resultOfSkipKVs = skipKVsNewerThanReadpoint();
444         } finally {
445           this.stopSkippingKVsIfNextRow = false;
446         }
447         if (!resultOfSkipKVs
448             || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
449           return seekToPreviousRow(firstKeyOfPreviousRow);
450         }
451 
452         return true;
453       } finally {
454         realSeekDone = true;
455       }
456     } catch (IOException ioe) {
457       throw new IOException("Could not seekToPreviousRow " + this + " to key "
458           + key, ioe);
459     }
460   }
461 
462   @Override
463   public boolean seekToLastRow() throws IOException {
464     byte[] lastRow = reader.getLastRowKey();
465     if (lastRow == null) {
466       return false;
467     }
468     KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
469     if (seek(seekKey)) {
470       return true;
471     } else {
472       return seekToPreviousRow(seekKey);
473     }
474   }
475 
476   @Override
477   public boolean backwardSeek(Cell key) throws IOException {
478     seek(key);
479     if (cur == null
480         || getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
481             cur.getRowLength(), key.getRowArray(), key.getRowOffset(),
482             key.getRowLength()) > 0) {
483       return seekToPreviousRow(key);
484     }
485     return true;
486   }
487 
488   @Override
489   public Cell getNextIndexedKey() {
490     return hfs.getNextIndexedKey();
491   }
492 }