View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.io.TimeRange;
39  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
40  import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
41  
42  /**
43   * KeyValueScanner adaptor over the Reader.  It also provides hooks into
44   * bloom filter things.
45   */
46  @InterfaceAudience.LimitedPrivate("Coprocessor")
47  public class StoreFileScanner implements KeyValueScanner {
48    private static final Log LOG = LogFactory.getLog(HStore.class);
49  
50    // the reader it comes from:
51    private final StoreFile.Reader reader;
52    private final HFileScanner hfs;
53    private Cell cur = null;
54    private boolean closed = false;
55  
56    private boolean realSeekDone;
57    private boolean delayedReseek;
58    private Cell delayedSeekKV;
59  
60    private boolean enforceMVCC = false;
61    private boolean hasMVCCInfo = false;
62    // A flag represents whether could stop skipping KeyValues for MVCC
63    // if have encountered the next row. Only used for reversed scan
64    private boolean stopSkippingKVsIfNextRow = false;
65  
66    private static AtomicLong seekCount;
67  
68    private ScanQueryMatcher matcher;
69  
70    private long readPt;
71  
72    /**
73     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
74     * @param hfs HFile scanner
75     */
76    public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
77        boolean hasMVCC, long readPt) {
78      this.readPt = readPt;
79      this.reader = reader;
80      this.hfs = hfs;
81      this.enforceMVCC = useMVCC;
82      this.hasMVCCInfo = hasMVCC;
83    }
84  
85    boolean isPrimaryReplica() {
86      return reader.isPrimaryReplicaReader();
87    }
88  
89    /**
90     * Return an array of scanners corresponding to the given
91     * set of store files.
92     */
93    public static List<StoreFileScanner> getScannersForStoreFiles(
94        Collection<StoreFile> files,
95        boolean cacheBlocks,
96        boolean usePread, long readPt) throws IOException {
97      return getScannersForStoreFiles(files, cacheBlocks,
98                                     usePread, false, false, readPt);
99    }
100 
101   /**
102    * Return an array of scanners corresponding to the given set of store files.
103    */
104   public static List<StoreFileScanner> getScannersForStoreFiles(
105       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
106       boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
107     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
108         useDropBehind, null, readPt);
109   }
110 
111   /**
112    * Return an array of scanners corresponding to the given set of store files,
113    * And set the ScanQueryMatcher for each store file scanner for further
114    * optimization
115    */
116   public static List<StoreFileScanner> getScannersForStoreFiles(
117       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
118       boolean isCompaction, boolean canUseDrop,
119       ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
120     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
121         files.size());
122     boolean succ = false;
123     try {
124       for (StoreFile file : files) {
125         StoreFile.Reader r = file.createReader(canUseDrop);
126         r.setReplicaStoreFile(isPrimaryReplica);
127         StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
128             isCompaction, readPt);
129         scanner.setScanQueryMatcher(matcher);
130         scanners.add(scanner);
131       }
132       succ = true;
133     } finally {
134       if (!succ) {
135         for (StoreFileScanner scanner : scanners) {
136           scanner.close();
137         }
138       }
139     }
140     return scanners;
141   }
142 
143   public static List<StoreFileScanner> getScannersForStoreFiles(
144     Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
145     boolean isCompaction, boolean canUseDrop,
146     ScanQueryMatcher matcher, long readPt) throws IOException {
147     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop,
148       matcher, readPt, true);
149   }
150 
151   public String toString() {
152     return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
153   }
154 
155   public Cell peek() {
156     return cur;
157   }
158 
159   public Cell next() throws IOException {
160     Cell retKey = cur;
161 
162     try {
163       // only seek if we aren't at the end. cur == null implies 'end'.
164       if (cur != null) {
165         hfs.next();
166         setCurrentCell(hfs.getKeyValue());
167         if (hasMVCCInfo || this.reader.isBulkLoaded()) {
168           skipKVsNewerThanReadpoint();
169         }
170       }
171     } catch (FileNotFoundException e) {
172       throw e;
173     } catch(IOException e) {
174       throw new IOException("Could not iterate " + this, e);
175     }
176     return retKey;
177   }
178 
179   public boolean seek(Cell key) throws IOException {
180     if (seekCount != null) seekCount.incrementAndGet();
181 
182     try {
183       try {
184         if(!seekAtOrAfter(hfs, key)) {
185           close();
186           return false;
187         }
188 
189         setCurrentCell(hfs.getKeyValue());
190 
191         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
192           return skipKVsNewerThanReadpoint();
193         } else {
194           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
195         }
196       } finally {
197         realSeekDone = true;
198       }
199     } catch (FileNotFoundException e) {
200       throw e;
201     } catch (IOException ioe) {
202       throw new IOException("Could not seek " + this + " to key " + key, ioe);
203     }
204   }
205 
206   public boolean reseek(Cell key) throws IOException {
207     if (seekCount != null) seekCount.incrementAndGet();
208 
209     try {
210       try {
211         if (!reseekAtOrAfter(hfs, key)) {
212           close();
213           return false;
214         }
215         setCurrentCell(hfs.getKeyValue());
216 
217         if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
218           return skipKVsNewerThanReadpoint();
219         } else {
220           return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
221         }
222       } finally {
223         realSeekDone = true;
224       }
225     } catch (FileNotFoundException e) {
226       throw e;
227     } catch (IOException ioe) {
228       throw new IOException("Could not reseek " + this + " to key " + key,
229           ioe);
230     }
231   }
232 
233   protected void setCurrentCell(Cell newVal) throws IOException {
234     this.cur = newVal;
235     if (this.cur != null && this.reader.isBulkLoaded()) {
236       CellUtil.setSequenceId(cur, this.reader.getSequenceID());
237     }
238   }
239 
240   protected boolean skipKVsNewerThanReadpoint() throws IOException {
241     // We want to ignore all key-values that are newer than our current
242     // readPoint
243     Cell startKV = cur;
244     while(enforceMVCC
245         && cur != null
246         && (cur.getMvccVersion() > readPt)) {
247       hfs.next();
248       setCurrentCell(hfs.getKeyValue());
249       if (this.stopSkippingKVsIfNextRow
250           && getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
251               cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(),
252               startKV.getRowLength()) > 0) {
253         return false;
254       }
255     }
256 
257     if (cur == null) {
258       close();
259       return false;
260     }
261 
262     return true;
263   }
264 
265   public void close() {
266     cur = null;
267     if (closed) return;
268     closed = true;
269     this.hfs.close();
270   }
271 
272   /**
273    *
274    * @param s
275    * @param k
276    * @return false if not found or if k is after the end.
277    * @throws IOException
278    */
279   public static boolean seekAtOrAfter(HFileScanner s, Cell k)
280   throws IOException {
281     int result = s.seekTo(k);
282     if(result < 0) {
283       if (result == HConstants.INDEX_KEY_MAGIC) {
284         // using faked key
285         return true;
286       }
287       // Passed KV is smaller than first KV in file, work from start of file
288       return s.seekTo();
289     } else if(result > 0) {
290       // Passed KV is larger than current KV in file, if there is a next
291       // it is the "after", if not then this scanner is done.
292       return s.next();
293     }
294     // Seeked to the exact key
295     return true;
296   }
297 
298   static boolean reseekAtOrAfter(HFileScanner s, Cell k)
299   throws IOException {
300     //This function is similar to seekAtOrAfter function
301     int result = s.reseekTo(k);
302     if (result <= 0) {
303       if (result == HConstants.INDEX_KEY_MAGIC) {
304         // using faked key
305         return true;
306       }
307       // If up to now scanner is not seeked yet, this means passed KV is smaller
308       // than first KV in file, and it is the first time we seek on this file.
309       // So we also need to work from the start of file.
310       if (!s.isSeeked()) {
311         return  s.seekTo();
312       }
313       return true;
314     }
315     // passed KV is larger than current KV in file, if there is a next
316     // it is after, if not then this scanner is done.
317     return s.next();
318   }
319 
320   @Override
321   public long getSequenceID() {
322     return reader.getSequenceID();
323   }
324 
325   /**
326    * Pretend we have done a seek but don't do it yet, if possible. The hope is
327    * that we find requested columns in more recent files and won't have to seek
328    * in older files. Creates a fake key/value with the given row/column and the
329    * highest (most recent) possible timestamp we might get from this file. When
330    * users of such "lazy scanner" need to know the next KV precisely (e.g. when
331    * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
332    * <p>
333    * Note that this function does guarantee that the current KV of this scanner
334    * will be advanced to at least the given KV. Because of this, it does have
335    * to do a real seek in cases when the seek timestamp is older than the
336    * highest timestamp of the file, e.g. when we are trying to seek to the next
337    * row/column and use OLDEST_TIMESTAMP in the seek key.
338    */
339   @Override
340   public boolean requestSeek(Cell kv, boolean forward, boolean useBloom)
341       throws IOException {
342     if (kv.getFamilyLength() == 0) {
343       useBloom = false;
344     }
345 
346     boolean haveToSeek = true;
347     if (useBloom) {
348       // check ROWCOL Bloom filter first.
349       if (reader.getBloomFilterType() == BloomType.ROWCOL) {
350         haveToSeek = reader.passesGeneralBloomFilter(kv.getRowArray(),
351             kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
352             kv.getQualifierOffset(), kv.getQualifierLength());
353       } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
354           ((CellUtil.isDeleteFamily(kv) || CellUtil.isDeleteFamilyVersion(kv)))) {
355         // if there is no such delete family kv in the store file,
356         // then no need to seek.
357         haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
358             kv.getRowOffset(), kv.getRowLength());
359       }
360     }
361 
362     delayedReseek = forward;
363     delayedSeekKV = kv;
364 
365     if (haveToSeek) {
366       // This row/column might be in this store file (or we did not use the
367       // Bloom filter), so we still need to seek.
368       realSeekDone = false;
369       long maxTimestampInFile = reader.getMaxTimestamp();
370       long seekTimestamp = kv.getTimestamp();
371       if (seekTimestamp > maxTimestampInFile) {
372         // Create a fake key that is not greater than the real next key.
373         // (Lower timestamps correspond to higher KVs.)
374         // To understand this better, consider that we are asked to seek to
375         // a higher timestamp than the max timestamp in this file. We know that
376         // the next point when we have to consider this file again is when we
377         // pass the max timestamp of this file (with the same row/column).
378         setCurrentCell(KeyValueUtil.createFirstOnRowColTS(kv, maxTimestampInFile));
379       } else {
380         // This will be the case e.g. when we need to seek to the next
381         // row/column, and we don't know exactly what they are, so we set the
382         // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
383         // row/column.
384         enforceSeek();
385       }
386       return cur != null;
387     }
388 
389     // Multi-column Bloom filter optimization.
390     // Create a fake key/value, so that this scanner only bubbles up to the top
391     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
392     // all other store files. The query matcher will then just skip this fake
393     // key/value and the store scanner will progress to the next column. This
394     // is obviously not a "real real" seek, but unlike the fake KV earlier in
395     // this method, we want this to be propagated to ScanQueryMatcher.
396     setCurrentCell(KeyValueUtil.createLastOnRowCol(kv));
397 
398     realSeekDone = true;
399     return true;
400   }
401 
402   Reader getReader() {
403     return reader;
404   }
405 
406   KeyValue.KVComparator getComparator() {
407     return reader.getComparator();
408   }
409 
410   @Override
411   public boolean realSeekDone() {
412     return realSeekDone;
413   }
414 
415   @Override
416   public void enforceSeek() throws IOException {
417     if (realSeekDone)
418       return;
419 
420     if (delayedReseek) {
421       reseek(delayedSeekKV);
422     } else {
423       seek(delayedSeekKV);
424     }
425   }
426 
427   public void setScanQueryMatcher(ScanQueryMatcher matcher) {
428     this.matcher = matcher;
429   }
430 
431   @Override
432   public boolean isFileScanner() {
433     return true;
434   }
435 
436   // Test methods
437 
438   static final long getSeekCount() {
439     return seekCount.get();
440   }
441   static final void instrument() {
442     seekCount = new AtomicLong();
443   }
444 
445   @Override
446   public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
447     // if the file has no entries, no need to validate or create a scanner.
448     byte[] cf = store.getFamily().getName();
449     TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
450     if (timeRange == null) {
451       timeRange = scan.getTimeRange();
452     }
453     return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader
454         .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf));
455   }
456 
457   @Override
458   @SuppressWarnings("deprecation")
459   public boolean seekToPreviousRow(Cell originalKey) throws IOException {
460     try {
461       try {
462         boolean keepSeeking = false;
463         Cell key = originalKey;
464         do {
465           KeyValue seekKey = KeyValueUtil.createFirstOnRow(key.getRowArray(), key.getRowOffset(),
466               key.getRowLength());
467           if (seekCount != null) seekCount.incrementAndGet();
468           if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
469               seekKey.getKeyLength())) {
470             close();
471             return false;
472           }
473           KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
474               .getRowArray(), hfs.getKeyValue().getRowOffset(), hfs.getKeyValue().getRowLength());
475 
476           if (seekCount != null) seekCount.incrementAndGet();
477           if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
478             close();
479             return false;
480           }
481 
482           setCurrentCell(hfs.getKeyValue());
483           this.stopSkippingKVsIfNextRow = true;
484           boolean resultOfSkipKVs;
485           try {
486             resultOfSkipKVs = skipKVsNewerThanReadpoint();
487           } finally {
488             this.stopSkippingKVsIfNextRow = false;
489           }
490           if (!resultOfSkipKVs
491               || getComparator().compareRows(cur, firstKeyOfPreviousRow) > 0) {
492             keepSeeking = true;
493             key = firstKeyOfPreviousRow;
494             continue;
495           } else {
496             keepSeeking = false;
497           }
498         } while (keepSeeking);
499         return true;
500       } finally {
501         realSeekDone = true;
502       }
503     } catch (FileNotFoundException e) {
504       throw e;
505     } catch (IOException ioe) {
506       throw new IOException("Could not seekToPreviousRow " + this + " to key "
507           + originalKey, ioe);
508     }
509   }
510 
511   @Override
512   public boolean seekToLastRow() throws IOException {
513     byte[] lastRow = reader.getLastRowKey();
514     if (lastRow == null) {
515       return false;
516     }
517     KeyValue seekKey = KeyValueUtil.createFirstOnRow(lastRow);
518     if (seek(seekKey)) {
519       return true;
520     } else {
521       return seekToPreviousRow(seekKey);
522     }
523   }
524 
525   @Override
526   public boolean backwardSeek(Cell key) throws IOException {
527     seek(key);
528     if (cur == null
529         || getComparator().compareRows(cur.getRowArray(), cur.getRowOffset(),
530             cur.getRowLength(), key.getRowArray(), key.getRowOffset(),
531             key.getRowLength()) > 0) {
532       return seekToPreviousRow(key);
533     }
534     return true;
535   }
536 
537   @Override
538   public Cell getNextIndexedKey() {
539     return hfs.getNextIndexedKey();
540   }
541 }