View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.List;
26  import java.util.SortedSet;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.client.Scan;
35  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
36  import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
37  import org.apache.hadoop.hbase.util.Bytes;
38  
39  /**
40   * KeyValueScanner adaptor over the Reader.  It also provides hooks into
41   * bloom filter things.
42   */
43  @InterfaceAudience.LimitedPrivate("Coprocessor")
44  public class StoreFileScanner implements KeyValueScanner {
45    static final Log LOG = LogFactory.getLog(HStore.class);
46  
47    // the reader it comes from:
48    private final StoreFile.Reader reader;
49    private final HFileScanner hfs;
50    private KeyValue cur = null;
51  
52    private boolean realSeekDone;
53    private boolean delayedReseek;
54    private KeyValue delayedSeekKV;
55  
56    private boolean enforceMVCC = false;
57    private boolean hasMVCCInfo = false;
58    // A flag represents whether could stop skipping KeyValues for MVCC
59    // if have encountered the next row. Only used for reversed scan
60    private boolean stopSkippingKVsIfNextRow = false;
61  
62    private static AtomicLong seekCount;
63  
64    private ScanQueryMatcher matcher;
65    
66    private long readPt;
67  
68    /**
69     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
70     * @param hfs HFile scanner
71     */
72    public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
73        boolean hasMVCC, long readPt) {
74      this.readPt = readPt;
75      this.reader = reader;
76      this.hfs = hfs;
77      this.enforceMVCC = useMVCC;
78      this.hasMVCCInfo = hasMVCC;
79    }
80  
81    /**
82     * Return an array of scanners corresponding to the given
83     * set of store files.
84     */
85    public static List<StoreFileScanner> getScannersForStoreFiles(
86        Collection<StoreFile> files,
87        boolean cacheBlocks,
88        boolean usePread, long readPt) throws IOException {
89      return getScannersForStoreFiles(files, cacheBlocks,
90                                     usePread, false, readPt);
91    }
92  
93    /**
94     * Return an array of scanners corresponding to the given set of store files.
95     */
96    public static List<StoreFileScanner> getScannersForStoreFiles(
97        Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
98        boolean isCompaction, long readPt) throws IOException {
99      return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
100         null, readPt);
101   }
102 
103   /**
104    * Return an array of scanners corresponding to the given set of store files,
105    * And set the ScanQueryMatcher for each store file scanner for further
106    * optimization
107    */
108   public static List<StoreFileScanner> getScannersForStoreFiles(
109       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
110       boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
111     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
112         files.size());
113     for (StoreFile file : files) {
114       StoreFile.Reader r = file.createReader();
115       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
116           isCompaction, readPt);
117       scanner.setScanQueryMatcher(matcher);
118       scanners.add(scanner);
119     }
120     return scanners;
121   }
122 
123   public String toString() {
124     return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
125   }
126 
127   public KeyValue peek() {
128     return cur;
129   }
130 
131   public KeyValue next() throws IOException {
132     KeyValue retKey = cur;
133 
134     try {
135       // only seek if we aren't at the end. cur == null implies 'end'.
136       if (cur != null) {
137         hfs.next();
138         cur = hfs.getKeyValue();
139         if (hasMVCCInfo)
140           skipKVsNewerThanReadpoint();
141       }
142     } catch(IOException e) {
143       throw new IOException("Could not iterate " + this, e);
144     }
145     return retKey;
146   }
147 
148   public boolean seek(KeyValue key) throws IOException {
149     if (seekCount != null) seekCount.incrementAndGet();
150 
151     try {
152       try {
153         if(!seekAtOrAfter(hfs, key)) {
154           close();
155           return false;
156         }
157 
158         cur = hfs.getKeyValue();
159 
160         return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
161       } finally {
162         realSeekDone = true;
163       }
164     } catch (IOException ioe) {
165       throw new IOException("Could not seek " + this + " to key " + key, ioe);
166     }
167   }
168 
169   public boolean reseek(KeyValue key) throws IOException {
170     if (seekCount != null) seekCount.incrementAndGet();
171 
172     try {
173       try {
174         if (!reseekAtOrAfter(hfs, key)) {
175           close();
176           return false;
177         }
178         cur = hfs.getKeyValue();
179 
180         return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
181       } finally {
182         realSeekDone = true;
183       }
184     } catch (IOException ioe) {
185       throw new IOException("Could not reseek " + this + " to key " + key,
186           ioe);
187     }
188   }
189 
190   protected boolean skipKVsNewerThanReadpoint() throws IOException {
191     // We want to ignore all key-values that are newer than our current
192     // readPoint
193     KeyValue startKV = cur;
194     while(enforceMVCC
195         && cur != null
196         && (cur.getMvccVersion() > readPt)) {
197       hfs.next();
198       cur = hfs.getKeyValue();
199       if (this.stopSkippingKVsIfNextRow
200           && Bytes.compareTo(cur.getRowArray(), cur.getRowOffset(),
201               cur.getRowLength(), startKV.getRowArray(), startKV.getRowOffset(),
202               startKV.getRowLength()) > 0) {
203         return false;
204       }
205     }
206 
207     if (cur == null) {
208       close();
209       return false;
210     }
211 
212     // For the optimisation in HBASE-4346, we set the KV's memstoreTS to
213     // 0, if it is older than all the scanners' read points. It is possible
214     // that a newer KV's memstoreTS was reset to 0. But, there is an
215     // older KV which was not reset to 0 (because it was
216     // not old enough during flush). Make sure that we set it correctly now,
217     // so that the comparision order does not change.
218     if (cur.getMvccVersion() <= readPt) {
219       cur.setMvccVersion(0);
220     }
221     return true;
222   }
223 
224   public void close() {
225     // Nothing to close on HFileScanner?
226     cur = null;
227   }
228 
229   /**
230    *
231    * @param s
232    * @param k
233    * @return false if not found or if k is after the end.
234    * @throws IOException
235    */
236   public static boolean seekAtOrAfter(HFileScanner s, KeyValue k)
237   throws IOException {
238     int result = s.seekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
239     if(result < 0) {
240       if (result == HConstants.INDEX_KEY_MAGIC) {
241         // using faked key
242         return true;
243       }
244       // Passed KV is smaller than first KV in file, work from start of file
245       return s.seekTo();
246     } else if(result > 0) {
247       // Passed KV is larger than current KV in file, if there is a next
248       // it is the "after", if not then this scanner is done.
249       return s.next();
250     }
251     // Seeked to the exact key
252     return true;
253   }
254 
255   static boolean reseekAtOrAfter(HFileScanner s, KeyValue k)
256   throws IOException {
257     //This function is similar to seekAtOrAfter function
258     int result = s.reseekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
259     if (result <= 0) {
260       if (result == HConstants.INDEX_KEY_MAGIC) {
261         // using faked key
262         return true;
263       }
264       // If up to now scanner is not seeked yet, this means passed KV is smaller
265       // than first KV in file, and it is the first time we seek on this file.
266       // So we also need to work from the start of file.
267       if (!s.isSeeked()) {
268         return  s.seekTo();
269       }
270       return true;
271     }
272     // passed KV is larger than current KV in file, if there is a next
273     // it is after, if not then this scanner is done.
274     return s.next();
275   }
276 
277   @Override
278   public long getSequenceID() {
279     return reader.getSequenceID();
280   }
281 
282   /**
283    * Pretend we have done a seek but don't do it yet, if possible. The hope is
284    * that we find requested columns in more recent files and won't have to seek
285    * in older files. Creates a fake key/value with the given row/column and the
286    * highest (most recent) possible timestamp we might get from this file. When
287    * users of such "lazy scanner" need to know the next KV precisely (e.g. when
288    * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
289    * <p>
290    * Note that this function does guarantee that the current KV of this scanner
291    * will be advanced to at least the given KV. Because of this, it does have
292    * to do a real seek in cases when the seek timestamp is older than the
293    * highest timestamp of the file, e.g. when we are trying to seek to the next
294    * row/column and use OLDEST_TIMESTAMP in the seek key.
295    */
296   @Override
297   public boolean requestSeek(KeyValue kv, boolean forward, boolean useBloom)
298       throws IOException {
299     if (kv.getFamilyLength() == 0) {
300       useBloom = false;
301     }
302 
303     boolean haveToSeek = true;
304     if (useBloom) {
305       // check ROWCOL Bloom filter first.
306       if (reader.getBloomFilterType() == BloomType.ROWCOL) {
307         haveToSeek = reader.passesGeneralBloomFilter(kv.getRowArray(),
308             kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
309             kv.getQualifierOffset(), kv.getQualifierLength());
310       } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
311           (kv.isDeleteFamily() || kv.isDeleteFamilyVersion())) {
312         // if there is no such delete family kv in the store file,
313         // then no need to seek.
314         haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getRowArray(),
315             kv.getRowOffset(), kv.getRowLength());
316       }
317     }
318 
319     delayedReseek = forward;
320     delayedSeekKV = kv;
321 
322     if (haveToSeek) {
323       // This row/column might be in this store file (or we did not use the
324       // Bloom filter), so we still need to seek.
325       realSeekDone = false;
326       long maxTimestampInFile = reader.getMaxTimestamp();
327       long seekTimestamp = kv.getTimestamp();
328       if (seekTimestamp > maxTimestampInFile) {
329         // Create a fake key that is not greater than the real next key.
330         // (Lower timestamps correspond to higher KVs.)
331         // To understand this better, consider that we are asked to seek to
332         // a higher timestamp than the max timestamp in this file. We know that
333         // the next point when we have to consider this file again is when we
334         // pass the max timestamp of this file (with the same row/column).
335         cur = kv.createFirstOnRowColTS(maxTimestampInFile);
336       } else {
337         // This will be the case e.g. when we need to seek to the next
338         // row/column, and we don't know exactly what they are, so we set the
339         // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
340         // row/column.
341         enforceSeek();
342       }
343       return cur != null;
344     }
345 
346     // Multi-column Bloom filter optimization.
347     // Create a fake key/value, so that this scanner only bubbles up to the top
348     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
349     // all other store files. The query matcher will then just skip this fake
350     // key/value and the store scanner will progress to the next column. This
351     // is obviously not a "real real" seek, but unlike the fake KV earlier in
352     // this method, we want this to be propagated to ScanQueryMatcher.
353     cur = kv.createLastOnRowCol();
354 
355     realSeekDone = true;
356     return true;
357   }
358 
359   Reader getReaderForTesting() {
360     return reader;
361   }
362 
363   @Override
364   public boolean realSeekDone() {
365     return realSeekDone;
366   }
367 
368   @Override
369   public void enforceSeek() throws IOException {
370     if (realSeekDone)
371       return;
372 
373     if (delayedReseek) {
374       reseek(delayedSeekKV);
375     } else {
376       seek(delayedSeekKV);
377     }
378   }
379 
380   public void setScanQueryMatcher(ScanQueryMatcher matcher) {
381     this.matcher = matcher;
382   }
383 
384   @Override
385   public boolean isFileScanner() {
386     return true;
387   }
388 
389   // Test methods
390 
391   static final long getSeekCount() {
392     return seekCount.get();
393   }
394   static final void instrument() {
395     seekCount = new AtomicLong();
396   }
397 
398   @Override
399   public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
400     return reader.passesTimerangeFilter(scan, oldestUnexpiredTS)
401         && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
402   }
403 
404   @Override
405   public boolean seekToPreviousRow(KeyValue key) throws IOException {
406     try {
407       try {
408         KeyValue seekKey = KeyValue.createFirstOnRow(key.getRow());
409         if (seekCount != null) seekCount.incrementAndGet();
410         if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
411             seekKey.getKeyLength())) {
412           close();
413           return false;
414         }
415         KeyValue firstKeyOfPreviousRow = KeyValue.createFirstOnRow(hfs
416             .getKeyValue().getRow());
417 
418         if (seekCount != null) seekCount.incrementAndGet();
419         if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
420           close();
421           return false;
422         }
423 
424         cur = hfs.getKeyValue();
425         this.stopSkippingKVsIfNextRow = true;
426         boolean resultOfSkipKVs;
427         try {
428           resultOfSkipKVs = skipKVsNewerThanReadpoint();
429         } finally {
430           this.stopSkippingKVsIfNextRow = false;
431         }
432         if (!resultOfSkipKVs
433             || Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(),
434                 cur.getRowLength(), firstKeyOfPreviousRow.getBuffer(),
435                 firstKeyOfPreviousRow.getRowOffset(),
436                 firstKeyOfPreviousRow.getRowLength()) > 0) {
437           return seekToPreviousRow(firstKeyOfPreviousRow);
438         }
439 
440         return true;
441       } finally {
442         realSeekDone = true;
443       }
444     } catch (IOException ioe) {
445       throw new IOException("Could not seekToPreviousRow " + this + " to key "
446           + key, ioe);
447     }
448   }
449 
450   @Override
451   public boolean seekToLastRow() throws IOException {
452     byte[] lastRow = reader.getLastRowKey();
453     if (lastRow == null) {
454       return false;
455     }
456     KeyValue seekKey = KeyValue.createFirstOnRow(lastRow);
457     if (seek(seekKey)) {
458       return true;
459     } else {
460       return seekToPreviousRow(seekKey);
461     }
462   }
463 
464   @Override
465   public boolean backwardSeek(KeyValue key) throws IOException {
466     seek(key);
467     if (cur == null
468         || Bytes.compareTo(cur.getRowArray(), cur.getRowOffset(),
469             cur.getRowLength(), key.getRowArray(), key.getRowOffset(),
470             key.getRowLength()) > 0) {
471       return seekToPreviousRow(key);
472     }
473     return true;
474   }
475 }