View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.io.TimeRange;
39  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
40  import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
41  
42  /**
43   * KeyValueScanner adaptor over the Reader.  It also provides hooks into
44   * bloom filter things.
45   */
46  @InterfaceAudience.LimitedPrivate("Coprocessor")
47  public class StoreFileScanner implements KeyValueScanner {
48    private static final Log LOG = LogFactory.getLog(HStore.class);
49  
50    // the reader it comes from:
51    private final StoreFile.Reader reader;
52    private final HFileScanner hfs;
53    private Cell cur = null;
54  
55    private boolean realSeekDone;
56    private boolean delayedReseek;
57    private Cell delayedSeekKV;
58  
59    private boolean enforceMVCC = false;
60    private boolean hasMVCCInfo = false;
61    // A flag represents whether could stop skipping KeyValues for MVCC
62    // if have encountered the next row. Only used for reversed scan
63    private boolean stopSkippingKVsIfNextRow = false;
64  
65    private static AtomicLong seekCount;
66  
67    private ScanQueryMatcher matcher;
68  
69    private long readPt;
70  
71    /**
72     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
73     * @param hfs HFile scanner
74     */
75    public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
76        boolean hasMVCC, long readPt) {
77      this.readPt = readPt;
78      this.reader = reader;
79      this.hfs = hfs;
80      this.enforceMVCC = useMVCC;
81      this.hasMVCCInfo = hasMVCC;
82    }
83  
84    boolean isPrimaryReplica() {
85      return reader.isPrimaryReplicaReader();
86    }
87  
88    /**
89     * Return an array of scanners corresponding to the given
90     * set of store files.
91     */
92    public static List<StoreFileScanner> getScannersForStoreFiles(
93        Collection<StoreFile> files,
94        boolean cacheBlocks,
95        boolean usePread, long readPt) throws IOException {
96      return getScannersForStoreFiles(files, cacheBlocks,
97                                     usePread, false, false, readPt);
98    }
99  
100   /**
101    * Return an array of scanners corresponding to the given set of store files.
102    */
103   public static List<StoreFileScanner> getScannersForStoreFiles(
104       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
105       boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
106     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
107         useDropBehind, null, readPt);
108   }
109 
110   /**
111    * Return an array of scanners corresponding to the given set of store files,
112    * And set the ScanQueryMatcher for each store file scanner for further
113    * optimization
114    */
115   public static List<StoreFileScanner> getScannersForStoreFiles(
116       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
117       boolean isCompaction, boolean canUseDrop,
118       ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
119     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
120         files.size());
121     for (StoreFile file : files) {
122       StoreFile.Reader r = file.createReader(canUseDrop);
123       r.setReplicaStoreFile(isPrimaryReplica);
124       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
125           isCompaction, readPt);
126       scanner.setScanQueryMatcher(matcher);
127       scanners.add(scanner);
128     }
129     return scanners;
130   }
131 
132   public static List<StoreFileScanner> getScannersForStoreFiles(
133     Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
134     boolean isCompaction, boolean canUseDrop,
135     ScanQueryMatcher matcher, long readPt) throws IOException {
136     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop,
137       matcher, readPt, true);
138   }
139 
140   public String toString() {
141     return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
142   }
143 
144   public Cell peek() {
145     return cur;
146   }
147 
148   public Cell next() throws IOException {
149     Cell retKey = cur;
150 
151     try {
152       // only seek if we aren't at the end. cur == null implies 'end'.
153       if (cur != null) {
154         hfs.next();
155         setCurrentCell(hfs.getKeyValue());
156         if (hasMVCCInfo || this.reader.isBulkLoaded()) {
157           skipKVsNewerThanReadpoint();
158         }
159       }
160     } catch (FileNotFoundException e) {
161       throw e;
162     } catch(IOException e) {
163       throw new IOException("Could not iterate " + this, e);
164     }
165     return retKey;
166   }
167 
168   public boolean seek(Cell key) throws IOException {
169     if (seekCount != null) seekCount.incrementAndGet();
170 
171     try {
172       try {
173         if(!seekAtOrAfter(hfs, key)) {
174           close();
175           return false;
176         }
177 
178         setCurrentCell(hfs.getKeyValue());
179 
180         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
181           return skipKVsNewerThanReadpoint();
182         } else {
183           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
184         }
185       } finally {
186         realSeekDone = true;
187       }
188     } catch (FileNotFoundException e) {
189       throw e;
190     } catch (IOException ioe) {
191       throw new IOException("Could not seek " + this + " to key " + key, ioe);
192     }
193   }
194 
195   public boolean reseek(Cell key) throws IOException {
196     if (seekCount != null) seekCount.incrementAndGet();
197 
198     try {
199       try {
200         if (!reseekAtOrAfter(hfs, key)) {
201           close();
202           return false;
203         }
204         setCurrentCell(hfs.getKeyValue());
205 
206         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
207           return skipKVsNewerThanReadpoint();
208         } else {
209           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
210         }
211       } finally {
212         realSeekDone = true;
213       }
214     } catch (FileNotFoundException e) {
215       throw e;
216     } catch (IOException ioe) {
217       throw new IOException("Could not reseek " + this + " to key " + key,
218           ioe);
219     }
220   }
221 
222   protected void setCurrentCell(Cell newVal) throws IOException {
223     this.cur = newVal;
224     if (this.cur != null && this.reader.isBulkLoaded()) {
225       CellUtil.setSequenceId(cur, this.reader.getSequenceID());
226     }
227   }
228 
229   protected boolean skipKVsNewerThanReadpoint() throws IOException {
230     // We want to ignore all key-values that are newer than our current
231     // readPoint
232     Cell startKV = cur;
233     while(enforceMVCC
234         && cur != null
235         && (cur.getMvccVersion() > readPt)) {
236       hfs.next();
237       setCurrentCell(hfs.getKeyValue());
238       if (this.stopSkippingKVsIfNextRow
239           && getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
240               cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(),
241               startKV.getRowLength()) > 0) {
242         return false;
243       }
244     }
245 
246     if (cur == null) {
247       close();
248       return false;
249     }
250 
251     return true;
252   }
253 
254   public void close() {
255     // Nothing to close on HFileScanner?
256     cur = null;
257   }
258 
259   /**
260    *
261    * @param s
262    * @param k
263    * @return false if not found or if k is after the end.
264    * @throws IOException
265    */
266   public static boolean seekAtOrAfter(HFileScanner s, Cell k)
267   throws IOException {
268     int result = s.seekTo(k);
269     if(result < 0) {
270       if (result == HConstants.INDEX_KEY_MAGIC) {
271         // using faked key
272         return true;
273       }
274       // Passed KV is smaller than first KV in file, work from start of file
275       return s.seekTo();
276     } else if(result > 0) {
277       // Passed KV is larger than current KV in file, if there is a next
278       // it is the "after", if not then this scanner is done.
279       return s.next();
280     }
281     // Seeked to the exact key
282     return true;
283   }
284 
285   static boolean reseekAtOrAfter(HFileScanner s, Cell k)
286   throws IOException {
287     //This function is similar to seekAtOrAfter function
288     int result = s.reseekTo(k);
289     if (result <= 0) {
290       if (result == HConstants.INDEX_KEY_MAGIC) {
291         // using faked key
292         return true;
293       }
294       // If up to now scanner is not seeked yet, this means passed KV is smaller
295       // than first KV in file, and it is the first time we seek on this file.
296       // So we also need to work from the start of file.
297       if (!s.isSeeked()) {
298         return  s.seekTo();
299       }
300       return true;
301     }
302     // passed KV is larger than current KV in file, if there is a next
303     // it is after, if not then this scanner is done.
304     return s.next();
305   }
306 
307   @Override
308   public long getSequenceID() {
309     return reader.getSequenceID();
310   }
311 
312   /**
313    * Pretend we have done a seek but don't do it yet, if possible. The hope is
314    * that we find requested columns in more recent files and won't have to seek
315    * in older files. Creates a fake key/value with the given row/column and the
316    * highest (most recent) possible timestamp we might get from this file. When
317    * users of such "lazy scanner" need to know the next KV precisely (e.g. when
318    * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
319    * <p>
320    * Note that this function does guarantee that the current KV of this scanner
321    * will be advanced to at least the given KV. Because of this, it does have
322    * to do a real seek in cases when the seek timestamp is older than the
323    * highest timestamp of the file, e.g. when we are trying to seek to the next
324    * row/column and use OLDEST_TIMESTAMP in the seek key.
325    */
326   @Override
327   public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
328       throws IOException {
329     if (kv.getFamilyLength() == 0) {
330       useBloom = false;
331     }
332 
333     boolean haveToSeek = true;
334     if (useBloom) {
335       // check ROWCOL Bloom filter first.
336       if (reader.getBloomFilterType() == BloomType.ROWCOL) {
337         haveToSeek = reader.passesGeneralBloomFilter(kv.getRowArray(),
338             kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
339             kv.getQualifierOffset(), kv.getQualifierLength());
340       } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
341           ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
342         // if there is no such delete family kv in the store file,
343         // then no need to seek.
344         haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
345             kv.getRowOffset(), kv.getRowLength());
346       }
347     }
348 
349     delayedReseek = forward;
350     delayedSeekKV = kv;
351 
352     if (haveToSeek) {
353       // This row/column might be in this store file (or we did not use the
354       // Bloom filter), so we still need to seek.
355       realSeekDone = false;
356       long maxTimestampInFile = reader.getMaxTimestamp();
357       long seekTimestamp = kv.getTimestamp();
358       if (seekTimestamp > maxTimestampInFile) {
359         // Create a fake key that is not greater than the real next key.
360         // (Lower timestamps correspond to higher KVs.)
361         // To understand this better, consider that we are asked to seek to
362         // a higher timestamp than the max timestamp in this file. We know that
363         // the next point when we have to consider this file again is when we
364         // pass the max timestamp of this file (with the same row/column).
365         setCurrentCell(KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
366       } else {
367         // This will be the case e.g. when we need to seek to the next
368         // row/column, and we don't know exactly what they are, so we set the
369         // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
370         // row/column.
371         enforceSeek();
372       }
373       return cur != null;
374     }
375 
376     // Multi-column Bloom filter optimization.
377     // Create a fake key/value, so that this scanner only bubbles up to the top
378     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
379     // all other store files. The query matcher will then just skip this fake
380     // key/value and the store scanner will progress to the next column. This
381     // is obviously not a "real real" seek, but unlike the fake KV earlier in
382     // this method, we want this to be propagated to ScanQueryMatcher.
383     setCurrentCell(KeyValueUtil.createLastOnRowCol(kv));
384 
385     realSeekDone = true;
386     return true;
387   }
388 
389   Reader getReader() {
390     return reader;
391   }
392 
393   KeyValue.KVComparator getComparator() {
394     return reader.getComparator();
395   }
396 
397   @Override
398   public boolean realSeekDone() {
399     return realSeekDone;
400   }
401 
402   @Override
403   public void enforceSeek() throws IOException {
404     if (realSeekDone)
405       return;
406 
407     if (delayedReseek) {
408       reseek(delayedSeekKV);
409     } else {
410       seek(delayedSeekKV);
411     }
412   }
413 
414   public void setScanQueryMatcher(ScanQueryMatcher matcher) {
415     this.matcher = matcher;
416   }
417 
418   @Override
419   public boolean isFileScanner() {
420     return true;
421   }
422 
423   // Test methods
424 
425   static final long getSeekCount() {
426     return seekCount.get();
427   }
428   static final void instrument() {
429     seekCount = new AtomicLong();
430   }
431 
432   @Override
433   public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
434     // if the file has no entries, no need to validate or create a scanner.
435     byte[] cf = store.getFamily().getName();
436     TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
437     if (timeRange == null) {
438       timeRange = scan.getTimeRange();
439     }
440     return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader
441         .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf));
442   }
443 
444   @Override
445   @SuppressWarnings("deprecation")
446   public boolean seekToPreviousRow(Cell key) throws IOException {
447     try {
448       try {
449         KeyValue seekKey = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
450             key.getRowLength());
451         if (seekCount != null) seekCount.incrementAndGet();
452         if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
453             seekKey.getKeyLength())) {
454           close();
455           return false;
456         }
457         KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
458             .getRowArray(), hfs.getKeyValue().getRowOffset(), hfs.getKeyValue().getRowLength());
459 
460         if (seekCount != null) seekCount.incrementAndGet();
461         if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
462           close();
463           return false;
464         }
465 
466         setCurrentCell(hfs.getKeyValue());
467         this.stopSkippingKVsIfNextRow = true;
468         boolean resultOfSkipKVs;
469         try {
470           resultOfSkipKVs = skipKVsNewerThanReadpoint();
471         } finally {
472           this.stopSkippingKVsIfNextRow = false;
473         }
474         if (!resultOfSkipKVs
475             || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
476           return seekToPreviousRow(firstKeyOfPreviousRow);
477         }
478 
479         return true;
480       } finally {
481         realSeekDone = true;
482       }
483     } catch (FileNotFoundException e) {
484       throw e;
485     } catch (IOException ioe) {
486       throw new IOException("Could not seekToPreviousRow " + this + " to key "
487           + key, ioe);
488     }
489   }
490 
491   @Override
492   public boolean seekToLastRow() throws IOException {
493     byte[] lastRow = reader.getLastRowKey();
494     if (lastRow == null) {
495       return false;
496     }
497     KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
498     if (seek(seekKey)) {
499       return true;
500     } else {
501       return seekToPreviousRow(seekKey);
502     }
503   }
504 
505   @Override
506   public boolean backwardSeek(Cell key) throws IOException {
507     seek(key);
508     if (cur == null
509         || getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
510             cur.getRowLength(), key.getRowArray(), key.getRowOffset(),
511             key.getRowLength()) > 0) {
512       return seekToPreviousRow(key);
513     }
514     return true;
515   }
516 
517   @Override
518   public Cell getNextIndexedKey() {
519     return hfs.getNextIndexedKey();
520   }
521 }