View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import java.io.FileNotFoundException;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.List;
28  import java.util.SortedSet;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.client.Scan;
35  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
36  import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
37  
38  /**
39   * KeyValueScanner adaptor over the Reader.  It also provides hooks into
40   * bloom filter things.
41   */
42  public class StoreFileScanner implements KeyValueScanner {
43    static final Log LOG = LogFactory.getLog(Store.class);
44  
45    // the reader it comes from:
46    private final StoreFile.Reader reader;
47    private final HFileScanner hfs;
48    private KeyValue cur = null;
49  
50    private boolean realSeekDone;
51    private boolean delayedReseek;
52    private KeyValue delayedSeekKV;
53  
54    private boolean enforceMVCC = false;
55    private boolean hasMVCCInfo = false;
56  
57    private static AtomicLong seekCount;
58  
59    private ScanQueryMatcher matcher;
60  
61    /**
62     * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
63     * @param hfs HFile scanner
64     */
65    public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, boolean hasMVCC) {
66      this.reader = reader;
67      this.hfs = hfs;
68      this.enforceMVCC = useMVCC;
69      this.hasMVCCInfo = hasMVCC;
70    }
71  
72    /**
73     * Return an array of scanners corresponding to the given
74     * set of store files.
75     */
76    public static List<StoreFileScanner> getScannersForStoreFiles(
77        Collection<StoreFile> files,
78        boolean cacheBlocks,
79        boolean usePread) throws IOException {
80      return getScannersForStoreFiles(files, cacheBlocks,
81                                     usePread, false);
82    }
83  
84    /**
85     * Return an array of scanners corresponding to the given set of store files.
86     */
87    public static List<StoreFileScanner> getScannersForStoreFiles(
88        Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
89        boolean isCompaction) throws IOException {
90      return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
91          null);
92    }
93  
94    /**
95     * Return an array of scanners corresponding to the given set of store files,
96     * And set the ScanQueryMatcher for each store file scanner for further
97     * optimization
98     */
99    public static List<StoreFileScanner> getScannersForStoreFiles(
100       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
101       boolean isCompaction, ScanQueryMatcher matcher) throws IOException {
102     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
103         files.size());
104     for (StoreFile file : files) {
105       StoreFile.Reader r = file.createReader();
106       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
107           isCompaction);
108       scanner.setScanQueryMatcher(matcher);
109       scanners.add(scanner);
110     }
111     return scanners;
112   }
113 
114   public String toString() {
115     return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
116   }
117 
118   public KeyValue peek() {
119     return cur;
120   }
121 
122   public KeyValue next() throws IOException {
123     KeyValue retKey = cur;
124 
125     try {
126       // only seek if we aren't at the end. cur == null implies 'end'.
127       if (cur != null) {
128         hfs.next();
129         cur = hfs.getKeyValue();
130         if (hasMVCCInfo)
131           skipKVsNewerThanReadpoint();
132       }
133     } catch (FileNotFoundException e) {
134       throw e;
135     } catch(IOException e) {
136       throw new IOException("Could not iterate " + this, e);
137     }
138     return retKey;
139   }
140 
141   public boolean seek(KeyValue key) throws IOException {
142     if (seekCount != null) seekCount.incrementAndGet();
143 
144     try {
145       try {
146         if(!seekAtOrAfter(hfs, key)) {
147           close();
148           return false;
149         }
150 
151         cur = hfs.getKeyValue();
152 
153         return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
154       } finally {
155         realSeekDone = true;
156       }
157     } catch (FileNotFoundException e) {
158       throw e;
159     } catch (IOException ioe) {
160       throw new IOException("Could not seek " + this + " to key " + key, ioe);
161     }
162   }
163 
164   public boolean reseek(KeyValue key) throws IOException {
165     if (seekCount != null) seekCount.incrementAndGet();
166 
167     try {
168       try {
169         if (!reseekAtOrAfter(hfs, key)) {
170           close();
171           return false;
172         }
173         cur = hfs.getKeyValue();
174 
175         return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
176       } finally {
177         realSeekDone = true;
178       }
179     } catch (FileNotFoundException e) {
180       throw e;
181     } catch (IOException ioe) {
182       throw new IOException("Could not reseek " + this + " to key " + key,
183           ioe);
184     }
185   }
186 
187   protected boolean skipKVsNewerThanReadpoint() throws IOException {
188     long readPoint = MultiVersionConsistencyControl.getThreadReadPoint();
189 
190     // We want to ignore all key-values that are newer than our current
191     // readPoint
192     while(enforceMVCC
193         && cur != null
194         && (cur.getMemstoreTS() > readPoint)) {
195       hfs.next();
196       cur = hfs.getKeyValue();
197     }
198 
199     if (cur == null) {
200       close();
201       return false;
202     }
203 
204     // For the optimisation in HBASE-4346, we set the KV's memstoreTS to
205     // 0, if it is older than all the scanners' read points. It is possible
206     // that a newer KV's memstoreTS was reset to 0. But, there is an
207     // older KV which was not reset to 0 (because it was
208     // not old enough during flush). Make sure that we set it correctly now,
209     // so that the comparision order does not change.
210     if (cur.getMemstoreTS() <= readPoint) {
211       cur.setMemstoreTS(0);
212     }
213     return true;
214   }
215 
216   public void close() {
217     // Nothing to close on HFileScanner?
218     cur = null;
219   }
220 
221   /**
222    *
223    * @param s
224    * @param k
225    * @return
226    * @throws IOException
227    */
228   public static boolean seekAtOrAfter(HFileScanner s, KeyValue k)
229   throws IOException {
230     int result = s.seekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
231     if(result < 0) {
232       // Passed KV is smaller than first KV in file, work from start of file
233       return s.seekTo();
234     } else if(result > 0) {
235       // Passed KV is larger than current KV in file, if there is a next
236       // it is the "after", if not then this scanner is done.
237       return s.next();
238     }
239     // Seeked to the exact key
240     return true;
241   }
242 
243   static boolean reseekAtOrAfter(HFileScanner s, KeyValue k)
244   throws IOException {
245     //This function is similar to seekAtOrAfter function
246     int result = s.reseekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
247     if (result <= 0) {
248       // If up to now scanner is not seeked yet, this means passed KV is smaller
249       // than first KV in file, and it is the first time we seek on this file.
250       // So we also need to work from the start of file.
251       if (!s.isSeeked()) {
252         return  s.seekTo();
253       }
254       return true;
255     } else {
256       // passed KV is larger than current KV in file, if there is a next
257       // it is after, if not then this scanner is done.
258       return s.next();
259     }
260   }
261 
262   @Override
263   public long getSequenceID() {
264     return reader.getSequenceID();
265   }
266 
267   /**
268    * Pretend we have done a seek but don't do it yet, if possible. The hope is
269    * that we find requested columns in more recent files and won't have to seek
270    * in older files. Creates a fake key/value with the given row/column and the
271    * highest (most recent) possible timestamp we might get from this file. When
272    * users of such "lazy scanner" need to know the next KV precisely (e.g. when
273    * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
274    * <p>
275    * Note that this function does guarantee that the current KV of this scanner
276    * will be advanced to at least the given KV. Because of this, it does have
277    * to do a real seek in cases when the seek timestamp is older than the
278    * highest timestamp of the file, e.g. when we are trying to seek to the next
279    * row/column and use OLDEST_TIMESTAMP in the seek key.
280    */
281   @Override
282   public boolean requestSeek(KeyValue kv, boolean forward, boolean useBloom)
283       throws IOException {
284     if (kv.getFamilyLength() == 0) {
285       useBloom = false;
286     }
287 
288     boolean haveToSeek = true;
289     if (useBloom) {
290       // check ROWCOL Bloom filter first.
291       if (reader.getBloomFilterType() == StoreFile.BloomType.ROWCOL) {
292         haveToSeek = reader.passesGeneralBloomFilter(kv.getBuffer(),
293             kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
294             kv.getQualifierOffset(), kv.getQualifierLength());
295       } else if (this.matcher != null && !matcher.hasNullColumnInQuery() &&
296           kv.isDeleteFamily()) {
297         // if there is no such delete family kv in the store file,
298         // then no need to seek.
299         haveToSeek = reader.passesDeleteFamilyBloomFilter(kv.getBuffer(),
300             kv.getRowOffset(), kv.getRowLength());
301       }
302     }
303 
304     delayedReseek = forward;
305     delayedSeekKV = kv;
306 
307     if (haveToSeek) {
308       // This row/column might be in this store file (or we did not use the
309       // Bloom filter), so we still need to seek.
310       realSeekDone = false;
311       long maxTimestampInFile = reader.getMaxTimestamp();
312       long seekTimestamp = kv.getTimestamp();
313       if (seekTimestamp > maxTimestampInFile) {
314         // Create a fake key that is not greater than the real next key.
315         // (Lower timestamps correspond to higher KVs.)
316         // To understand this better, consider that we are asked to seek to
317         // a higher timestamp than the max timestamp in this file. We know that
318         // the next point when we have to consider this file again is when we
319         // pass the max timestamp of this file (with the same row/column).
320         cur = kv.createFirstOnRowColTS(maxTimestampInFile);
321       } else {
322         // This will be the case e.g. when we need to seek to the next
323         // row/column, and we don't know exactly what they are, so we set the
324         // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
325         // row/column.
326         enforceSeek();
327       }
328       return cur != null;
329     }
330 
331     // Multi-column Bloom filter optimization.
332     // Create a fake key/value, so that this scanner only bubbles up to the top
333     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
334     // all other store files. The query matcher will then just skip this fake
335     // key/value and the store scanner will progress to the next column. This
336     // is obviously not a "real real" seek, but unlike the fake KV earlier in
337     // this method, we want this to be propagated to ScanQueryMatcher.
338     cur = kv.createLastOnRowCol();
339 
340     realSeekDone = true;
341     return true;
342   }
343 
344   Reader getReaderForTesting() {
345     return reader;
346   }
347 
348   @Override
349   public boolean realSeekDone() {
350     return realSeekDone;
351   }
352 
353   @Override
354   public void enforceSeek() throws IOException {
355     if (realSeekDone)
356       return;
357 
358     if (delayedReseek) {
359       reseek(delayedSeekKV);
360     } else {
361       seek(delayedSeekKV);
362     }
363   }
364 
365   public void setScanQueryMatcher(ScanQueryMatcher matcher) {
366     this.matcher = matcher;
367   }
368 
369   @Override
370   public boolean isFileScanner() {
371     return true;
372   }
373 
374   // Test methods
375 
376   static final long getSeekCount() {
377     return seekCount.get();
378   }
379   static final void instrument() {
380     seekCount = new AtomicLong();
381   }
382 
383   @Override
384   public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
385     return reader.passesTimerangeFilter(scan, oldestUnexpiredTS)
386         && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
387   }
388 }