View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import java.io.IOException;
24  import java.util.Comparator;
25  import java.util.List;
26  import java.util.PriorityQueue;
27  
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.KeyValue.KVComparator;
30  
31  /**
32   * Implements a heap merge across any number of KeyValueScanners.
33   * <p>
34   * Implements KeyValueScanner itself.
35   * <p>
36   * This class is used at the Region level to merge across Stores
37   * and at the Store level to merge across the memstore and StoreFiles.
38   * <p>
39   * In the Region case, we also need InternalScanner.next(List), so this class
40   * also implements InternalScanner.  WARNING: As is, if you try to use this
41   * as an InternalScanner at the Store level, you will get runtime exceptions.
42   */
43  public class KeyValueHeap extends NonLazyKeyValueScanner
44      implements KeyValueScanner, InternalScanner {
45    private PriorityQueue<KeyValueScanner> heap = null;
46  
47    /**
48     * The current sub-scanner, i.e. the one that contains the next key/value
49     * to return to the client. This scanner is NOT included in {@link #heap}
50     * (but we frequently add it back to the heap and pull the new winner out).
51     * We maintain an invariant that the current sub-scanner has already done
52     * a real seek, and that current.peek() is always a real key/value (or null)
53     * except for the fake last-key-on-row-column supplied by the multi-column
54     * Bloom filter optimization, which is OK to propagate to StoreScanner. In
55     * order to ensure that, always use {@link #pollRealKV()} to update current.
56     */
57    private KeyValueScanner current = null;
58  
59    private KVScannerComparator comparator;
60  
61    /**
62     * Constructor.  This KeyValueHeap will handle closing of passed in
63     * KeyValueScanners.
64     * @param scanners
65     * @param comparator
66     */
67    public KeyValueHeap(List<? extends KeyValueScanner> scanners,
68        KVComparator comparator) throws IOException {
69      this.comparator = new KVScannerComparator(comparator);
70      if (!scanners.isEmpty()) {
71        this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
72            this.comparator);
73        for (KeyValueScanner scanner : scanners) {
74          if (scanner.peek() != null) {
75            this.heap.add(scanner);
76          } else {
77            scanner.close();
78          }
79        }
80        this.current = pollRealKV();
81      }
82    }
83  
84    public KeyValue peek() {
85      if (this.current == null) {
86        return null;
87      }
88      return this.current.peek();
89    }
90  
91    public KeyValue next()  throws IOException {
92      if(this.current == null) {
93        return null;
94      }
95      KeyValue kvReturn = this.current.next();
96      KeyValue kvNext = this.current.peek();
97      if (kvNext == null) {
98        this.current.close();
99        this.current = pollRealKV();
100     } else {
101       KeyValueScanner topScanner = this.heap.peek();
102       // no need to add current back to the heap if it is the only scanner left
103       if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
104         this.heap.add(this.current);
105         this.current = pollRealKV();
106       }
107     }
108     return kvReturn;
109   }
110 
111   /**
112    * Gets the next row of keys from the top-most scanner.
113    * <p>
114    * This method takes care of updating the heap.
115    * <p>
116    * This can ONLY be called when you are using Scanners that implement
117    * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
118    * @param result
119    * @param limit
120    * @return true if there are more keys, false if all scanners are done
121    */
122   public boolean next(List<KeyValue> result, int limit) throws IOException {
123     return next(result, limit, null);
124   }
125 
126   /**
127    * Gets the next row of keys from the top-most scanner.
128    * <p>
129    * This method takes care of updating the heap.
130    * <p>
131    * This can ONLY be called when you are using Scanners that implement
132    * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
133    * @param result output result list
134    * @param limit limit on row count to get
135    * @param metric the metric name
136    * @return true if there are more keys, false if all scanners are done
137    */
138   public boolean next(List<KeyValue> result, int limit, String metric) throws IOException {
139     if (this.current == null) {
140       return false;
141     }
142     InternalScanner currentAsInternal = (InternalScanner)this.current;
143     boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric);
144     KeyValue pee = this.current.peek();
145     /*
146      * By definition, any InternalScanner must return false only when it has no
147      * further rows to be fetched. So, we can close a scanner if it returns
148      * false. All existing implementations seem to be fine with this. It is much
149      * more efficient to close scanners which are not needed than keep them in
150      * the heap. This is also required for certain optimizations.
151      */
152     if (pee == null || !mayContainMoreRows) {
153       this.current.close();
154     } else {
155       this.heap.add(this.current);
156     }
157     this.current = pollRealKV();
158     return (this.current != null);
159   }
160 
161   /**
162    * Gets the next row of keys from the top-most scanner.
163    * <p>
164    * This method takes care of updating the heap.
165    * <p>
166    * This can ONLY be called when you are using Scanners that implement
167    * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
168    * @param result
169    * @return true if there are more keys, false if all scanners are done
170    */
171   public boolean next(List<KeyValue> result) throws IOException {
172     return next(result, -1);
173   }
174 
175   @Override
176   public boolean next(List<KeyValue> result, String metric) throws IOException {
177     return next(result, -1, metric);
178   }
179 
180   private static class KVScannerComparator implements Comparator<KeyValueScanner> {
181     private KVComparator kvComparator;
182     /**
183      * Constructor
184      * @param kvComparator
185      */
186     public KVScannerComparator(KVComparator kvComparator) {
187       this.kvComparator = kvComparator;
188     }
189     public int compare(KeyValueScanner left, KeyValueScanner right) {
190       int comparison = compare(left.peek(), right.peek());
191       if (comparison != 0) {
192         return comparison;
193       } else {
194         // Since both the keys are exactly the same, we break the tie in favor
195         // of the key which came latest.
196         long leftSequenceID = left.getSequenceID();
197         long rightSequenceID = right.getSequenceID();
198         if (leftSequenceID > rightSequenceID) {
199           return -1;
200         } else if (leftSequenceID < rightSequenceID) {
201           return 1;
202         } else {
203           return 0;
204         }
205       }
206     }
207     /**
208      * Compares two KeyValue
209      * @param left
210      * @param right
211      * @return less than 0 if left is smaller, 0 if equal etc..
212      */
213     public int compare(KeyValue left, KeyValue right) {
214       return this.kvComparator.compare(left, right);
215     }
216     /**
217      * @return KVComparator
218      */
219     public KVComparator getComparator() {
220       return this.kvComparator;
221     }
222   }
223 
224   public void close() {
225     if (this.current != null) {
226       this.current.close();
227     }
228     if (this.heap != null) {
229       KeyValueScanner scanner;
230       while ((scanner = this.heap.poll()) != null) {
231         scanner.close();
232       }
233     }
234   }
235 
236   /**
237    * Seeks all scanners at or below the specified seek key.  If we earlied-out
238    * of a row, we may end up skipping values that were never reached yet.
239    * Rather than iterating down, we want to give the opportunity to re-seek.
240    * <p>
241    * As individual scanners may run past their ends, those scanners are
242    * automatically closed and removed from the heap.
243    * <p>
244    * This function (and {@link #reseek(KeyValue)}) does not do multi-column
245    * Bloom filter and lazy-seek optimizations. To enable those, call
246    * {@link #requestSeek(KeyValue, boolean, boolean)}.
247    * @param seekKey KeyValue to seek at or after
248    * @return true if KeyValues exist at or after specified key, false if not
249    * @throws IOException
250    */
251   @Override
252   public boolean seek(KeyValue seekKey) throws IOException {
253     return generalizedSeek(false,    // This is not a lazy seek
254                            seekKey,
255                            false,    // forward (false: this is not a reseek)
256                            false);   // Not using Bloom filters
257   }
258 
259   /**
260    * This function is identical to the {@link #seek(KeyValue)} function except
261    * that scanner.seek(seekKey) is changed to scanner.reseek(seekKey).
262    */
263   @Override
264   public boolean reseek(KeyValue seekKey) throws IOException {
265     return generalizedSeek(false,    // This is not a lazy seek
266                            seekKey,
267                            true,     // forward (true because this is reseek)
268                            false);   // Not using Bloom filters
269   }
270 
271   /**
272    * {@inheritDoc}
273    */
274   @Override
275   public boolean requestSeek(KeyValue key, boolean forward,
276       boolean useBloom) throws IOException {
277     return generalizedSeek(true, key, forward, useBloom);
278   }
279 
280   /**
281    * @param isLazy whether we are trying to seek to exactly the given row/col.
282    *          Enables Bloom filter and most-recent-file-first optimizations for
283    *          multi-column get/scan queries.
284    * @param seekKey key to seek to
285    * @param forward whether to seek forward (also known as reseek)
286    * @param useBloom whether to optimize seeks using Bloom filters
287    */
288   private boolean generalizedSeek(boolean isLazy, KeyValue seekKey,
289       boolean forward, boolean useBloom) throws IOException {
290     if (!isLazy && useBloom) {
291       throw new IllegalArgumentException("Multi-column Bloom filter " +
292           "optimization requires a lazy seek");
293     }
294 
295     if (current == null) {
296       return false;
297     }
298     heap.add(current);
299     current = null;
300 
301     KeyValueScanner scanner;
302     while ((scanner = heap.poll()) != null) {
303       KeyValue topKey = scanner.peek();
304       if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
305         // Top KeyValue is at-or-after Seek KeyValue. We only know that all
306         // scanners are at or after seekKey (because fake keys of
307         // scanners where a lazy-seek operation has been done are not greater
308         // than their real next keys) but we still need to enforce our
309         // invariant that the top scanner has done a real seek. This way
310         // StoreScanner and RegionScanner do not have to worry about fake keys.
311         heap.add(scanner);
312         current = pollRealKV();
313         return current != null;
314       }
315 
316       boolean seekResult;
317       if (isLazy && heap.size() > 0) {
318         // If there is only one scanner left, we don't do lazy seek.
319         seekResult = scanner.requestSeek(seekKey, forward, useBloom);
320       } else {
321         seekResult = NonLazyKeyValueScanner.doRealSeek(
322             scanner, seekKey, forward);
323       }
324 
325       if (!seekResult) {
326         scanner.close();
327       } else {
328         heap.add(scanner);
329       }
330     }
331 
332     // Heap is returning empty, scanner is done
333     return false;
334   }
335 
336   /**
337    * Fetches the top sub-scanner from the priority queue, ensuring that a real
338    * seek has been done on it. Works by fetching the top sub-scanner, and if it
339    * has not done a real seek, making it do so (which will modify its top KV),
340    * putting it back, and repeating this until success. Relies on the fact that
341    * on a lazy seek we set the current key of a StoreFileScanner to a KV that
342    * is not greater than the real next KV to be read from that file, so the
343    * scanner that bubbles up to the top of the heap will have global next KV in
344    * this scanner heap if (1) it has done a real seek and (2) its KV is the top
345    * among all top KVs (some of which are fake) in the scanner heap.
346    */
347   private KeyValueScanner pollRealKV() throws IOException {
348     KeyValueScanner kvScanner = heap.poll();
349     if (kvScanner == null) {
350       return null;
351     }
352 
353     while (kvScanner != null && !kvScanner.realSeekDone()) {
354       if (kvScanner.peek() != null) {
355         kvScanner.enforceSeek();
356         KeyValue curKV = kvScanner.peek();
357         if (curKV != null) {
358           KeyValueScanner nextEarliestScanner = heap.peek();
359           if (nextEarliestScanner == null) {
360             // The heap is empty. Return the only possible scanner.
361             return kvScanner;
362           }
363 
364           // Compare the current scanner to the next scanner. We try to avoid
365           // putting the current one back into the heap if possible.
366           KeyValue nextKV = nextEarliestScanner.peek();
367           if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
368             // We already have the scanner with the earliest KV, so return it.
369             return kvScanner;
370           }
371 
372           // Otherwise, put the scanner back into the heap and let it compete
373           // against all other scanners (both those that have done a "real
374           // seek" and a "lazy seek").
375           heap.add(kvScanner);
376         } else {
377           // Close the scanner because we did a real seek and found out there
378           // are no more KVs.
379           kvScanner.close();
380         }
381       } else {
382         // Close the scanner because it has already run out of KVs even before
383         // we had to do a real seek on it.
384         kvScanner.close();
385       }
386       kvScanner = heap.poll();
387     }
388 
389     return kvScanner;
390   }
391 
392   /**
393    * @return the current Heap
394    */
395   public PriorityQueue<KeyValueScanner> getHeap() {
396     return this.heap;
397   }
398 
399   @Override
400   public long getSequenceID() {
401     return 0;
402   }
403 
404   KeyValueScanner getCurrentForTesting() {
405     return current;
406   }
407 }