001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver;
021
022import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Comparator;
027import java.util.List;
028import java.util.PriorityQueue;
029
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellComparator;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
036
037/**
038 * Implements a heap merge across any number of KeyValueScanners.
039 * <p>
040 * Implements KeyValueScanner itself.
041 * <p>
042 * This class is used at the Region level to merge across Stores
043 * and at the Store level to merge across the memstore and StoreFiles.
044 * <p>
045 * In the Region case, we also need InternalScanner.next(List), so this class
046 * also implements InternalScanner.  WARNING: As is, if you try to use this
047 * as an InternalScanner at the Store level, you will get runtime exceptions.
048 */
049@InterfaceAudience.Private
050public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
051    implements KeyValueScanner, InternalScanner {
052  private static final Logger LOG = LoggerFactory.getLogger(KeyValueHeap.class);
053  protected PriorityQueue<KeyValueScanner> heap = null;
054  // Holds the scanners when a ever a eager close() happens.  All such eagerly closed
055  // scans are collected and when the final scanner.close() happens will perform the
056  // actual close.
057  protected List<KeyValueScanner> scannersForDelayedClose = null;
058
059  /**
060   * The current sub-scanner, i.e. the one that contains the next key/value
061   * to return to the client. This scanner is NOT included in {@link #heap}
062   * (but we frequently add it back to the heap and pull the new winner out).
063   * We maintain an invariant that the current sub-scanner has already done
064   * a real seek, and that current.peek() is always a real key/value (or null)
065   * except for the fake last-key-on-row-column supplied by the multi-column
066   * Bloom filter optimization, which is OK to propagate to StoreScanner. In
067   * order to ensure that, always use {@link #pollRealKV()} to update current.
068   */
069  protected KeyValueScanner current = null;
070
071  protected KVScannerComparator comparator;
072
073  /**
074   * Constructor.  This KeyValueHeap will handle closing of passed in
075   * KeyValueScanners.
076   * @param scanners
077   * @param comparator
078   */
079  public KeyValueHeap(List<? extends KeyValueScanner> scanners,
080      CellComparator comparator) throws IOException {
081    this(scanners, new KVScannerComparator(comparator));
082  }
083
084  /**
085   * Constructor.
086   * @param scanners
087   * @param comparator
088   * @throws IOException
089   */
090  KeyValueHeap(List<? extends KeyValueScanner> scanners,
091      KVScannerComparator comparator) throws IOException {
092    this.comparator = comparator;
093    this.scannersForDelayedClose = new ArrayList<>(scanners.size());
094    if (!scanners.isEmpty()) {
095      this.heap = new PriorityQueue<>(scanners.size(), this.comparator);
096      for (KeyValueScanner scanner : scanners) {
097        if (scanner.peek() != null) {
098          this.heap.add(scanner);
099        } else {
100          this.scannersForDelayedClose.add(scanner);
101        }
102      }
103      this.current = pollRealKV();
104    }
105  }
106
107  @Override
108  public Cell peek() {
109    if (this.current == null) {
110      return null;
111    }
112    return this.current.peek();
113  }
114
115  boolean isLatestCellFromMemstore() {
116    return !this.current.isFileScanner();
117  }
118
119  @Override
120  public Cell next()  throws IOException {
121    if(this.current == null) {
122      return null;
123    }
124    Cell kvReturn = this.current.next();
125    Cell kvNext = this.current.peek();
126    if (kvNext == null) {
127      this.scannersForDelayedClose.add(this.current);
128      this.current = null;
129      this.current = pollRealKV();
130    } else {
131      KeyValueScanner topScanner = this.heap.peek();
132      // no need to add current back to the heap if it is the only scanner left
133      if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
134        this.heap.add(this.current);
135        this.current = null;
136        this.current = pollRealKV();
137      }
138    }
139    return kvReturn;
140  }
141
142  /**
143   * Gets the next row of keys from the top-most scanner.
144   * <p>
145   * This method takes care of updating the heap.
146   * <p>
147   * This can ONLY be called when you are using Scanners that implement InternalScanner as well as
148   * KeyValueScanner (a {@link StoreScanner}).
149   * @return true if more rows exist after this one, false if scanner is done
150   */
151  @Override
152  public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
153    if (this.current == null) {
154      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
155    }
156    InternalScanner currentAsInternal = (InternalScanner)this.current;
157    boolean moreCells = currentAsInternal.next(result, scannerContext);
158    Cell pee = this.current.peek();
159
160    /*
161     * By definition, any InternalScanner must return false only when it has no
162     * further rows to be fetched. So, we can close a scanner if it returns
163     * false. All existing implementations seem to be fine with this. It is much
164     * more efficient to close scanners which are not needed than keep them in
165     * the heap. This is also required for certain optimizations.
166     */
167
168    if (pee == null || !moreCells) {
169      // add the scanner that is to be closed
170      this.scannersForDelayedClose.add(this.current);
171    } else {
172      this.heap.add(this.current);
173    }
174    this.current = null;
175    this.current = pollRealKV();
176    if (this.current == null) {
177      moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
178    }
179    return moreCells;
180  }
181
182  protected static class KVScannerComparator implements Comparator<KeyValueScanner> {
183    protected CellComparator kvComparator;
184    /**
185     * Constructor
186     * @param kvComparator
187     */
188    public KVScannerComparator(CellComparator kvComparator) {
189      this.kvComparator = kvComparator;
190    }
191
192    @Override
193    public int compare(KeyValueScanner left, KeyValueScanner right) {
194      int comparison = compare(left.peek(), right.peek());
195      if (comparison != 0) {
196        return comparison;
197      } else {
198        // Since both the keys are exactly the same, we break the tie in favor of higher ordered
199        // scanner since it'll have newer data. Since higher value should come first, we reverse
200        // sort here.
201        return Long.compare(right.getScannerOrder(), left.getScannerOrder());
202      }
203    }
204    /**
205     * Compares two KeyValue
206     * @param left
207     * @param right
208     * @return less than 0 if left is smaller, 0 if equal etc..
209     */
210    public int compare(Cell left, Cell right) {
211      return this.kvComparator.compare(left, right);
212    }
213    /**
214     * @return KVComparator
215     */
216    public CellComparator getComparator() {
217      return this.kvComparator;
218    }
219  }
220
221  @Override
222  public void close() {
223    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
224      scanner.close();
225    }
226    this.scannersForDelayedClose.clear();
227    if (this.current != null) {
228      this.current.close();
229    }
230    if (this.heap != null) {
231      // Order of closing the scanners shouldn't matter here, so simply iterate and close them.
232      for (KeyValueScanner scanner : heap) {
233        scanner.close();
234      }
235    }
236  }
237
238  /**
239   * Seeks all scanners at or below the specified seek key.  If we earlied-out
240   * of a row, we may end up skipping values that were never reached yet.
241   * Rather than iterating down, we want to give the opportunity to re-seek.
242   * <p>
243   * As individual scanners may run past their ends, those scanners are
244   * automatically closed and removed from the heap.
245   * <p>
246   * This function (and {@link #reseek(Cell)}) does not do multi-column
247   * Bloom filter and lazy-seek optimizations. To enable those, call
248   * {@link #requestSeek(Cell, boolean, boolean)}.
249   * @param seekKey KeyValue to seek at or after
250   * @return true if KeyValues exist at or after specified key, false if not
251   * @throws IOException
252   */
253  @Override
254  public boolean seek(Cell seekKey) throws IOException {
255    return generalizedSeek(false,    // This is not a lazy seek
256                           seekKey,
257                           false,    // forward (false: this is not a reseek)
258                           false);   // Not using Bloom filters
259  }
260
261  /**
262   * This function is identical to the {@link #seek(Cell)} function except
263   * that scanner.seek(seekKey) is changed to scanner.reseek(seekKey).
264   */
265  @Override
266  public boolean reseek(Cell seekKey) throws IOException {
267    return generalizedSeek(false,    // This is not a lazy seek
268                           seekKey,
269                           true,     // forward (true because this is reseek)
270                           false);   // Not using Bloom filters
271  }
272
273  /**
274   * {@inheritDoc}
275   */
276  @Override
277  public boolean requestSeek(Cell key, boolean forward,
278      boolean useBloom) throws IOException {
279    return generalizedSeek(true, key, forward, useBloom);
280  }
281
282  /**
283   * @param isLazy whether we are trying to seek to exactly the given row/col.
284   *          Enables Bloom filter and most-recent-file-first optimizations for
285   *          multi-column get/scan queries.
286   * @param seekKey key to seek to
287   * @param forward whether to seek forward (also known as reseek)
288   * @param useBloom whether to optimize seeks using Bloom filters
289   */
290  private boolean generalizedSeek(boolean isLazy, Cell seekKey,
291      boolean forward, boolean useBloom) throws IOException {
292    if (!isLazy && useBloom) {
293      throw new IllegalArgumentException("Multi-column Bloom filter " +
294          "optimization requires a lazy seek");
295    }
296
297    if (current == null) {
298      return false;
299    }
300
301    KeyValueScanner scanner = current;
302    try {
303      while (scanner != null) {
304        Cell topKey = scanner.peek();
305        if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
306          // Top KeyValue is at-or-after Seek KeyValue. We only know that all
307          // scanners are at or after seekKey (because fake keys of
308          // scanners where a lazy-seek operation has been done are not greater
309          // than their real next keys) but we still need to enforce our
310          // invariant that the top scanner has done a real seek. This way
311          // StoreScanner and RegionScanner do not have to worry about fake
312          // keys.
313          heap.add(scanner);
314          scanner = null;
315          current = pollRealKV();
316          return current != null;
317        }
318
319        boolean seekResult;
320        if (isLazy && heap.size() > 0) {
321          // If there is only one scanner left, we don't do lazy seek.
322          seekResult = scanner.requestSeek(seekKey, forward, useBloom);
323        } else {
324          seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey,
325              forward);
326        }
327
328        if (!seekResult) {
329          this.scannersForDelayedClose.add(scanner);
330        } else {
331          heap.add(scanner);
332        }
333        scanner = heap.poll();
334        if (scanner == null) {
335          current = null;
336        }
337      }
338    } catch (Exception e) {
339      if (scanner != null) {
340        try {
341          scanner.close();
342        } catch (Exception ce) {
343          LOG.warn("close KeyValueScanner error", ce);
344        }
345      }
346      throw e;
347    }
348
349    // Heap is returning empty, scanner is done
350    return false;
351  }
352
353  /**
354   * Fetches the top sub-scanner from the priority queue, ensuring that a real
355   * seek has been done on it. Works by fetching the top sub-scanner, and if it
356   * has not done a real seek, making it do so (which will modify its top KV),
357   * putting it back, and repeating this until success. Relies on the fact that
358   * on a lazy seek we set the current key of a StoreFileScanner to a KV that
359   * is not greater than the real next KV to be read from that file, so the
360   * scanner that bubbles up to the top of the heap will have global next KV in
361   * this scanner heap if (1) it has done a real seek and (2) its KV is the top
362   * among all top KVs (some of which are fake) in the scanner heap.
363   */
364  protected KeyValueScanner pollRealKV() throws IOException {
365    KeyValueScanner kvScanner = heap.poll();
366    if (kvScanner == null) {
367      return null;
368    }
369
370    while (kvScanner != null && !kvScanner.realSeekDone()) {
371      if (kvScanner.peek() != null) {
372        try {
373          kvScanner.enforceSeek();
374        } catch (IOException ioe) {
375          // Add the item to delayed close set in case it is leak from close
376          this.scannersForDelayedClose.add(kvScanner);
377          throw ioe;
378        }
379        Cell curKV = kvScanner.peek();
380        if (curKV != null) {
381          KeyValueScanner nextEarliestScanner = heap.peek();
382          if (nextEarliestScanner == null) {
383            // The heap is empty. Return the only possible scanner.
384            return kvScanner;
385          }
386
387          // Compare the current scanner to the next scanner. We try to avoid
388          // putting the current one back into the heap if possible.
389          Cell nextKV = nextEarliestScanner.peek();
390          if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
391            // We already have the scanner with the earliest KV, so return it.
392            return kvScanner;
393          }
394
395          // Otherwise, put the scanner back into the heap and let it compete
396          // against all other scanners (both those that have done a "real
397          // seek" and a "lazy seek").
398          heap.add(kvScanner);
399        } else {
400          // Close the scanner because we did a real seek and found out there
401          // are no more KVs.
402          this.scannersForDelayedClose.add(kvScanner);
403        }
404      } else {
405        // Close the scanner because it has already run out of KVs even before
406        // we had to do a real seek on it.
407        this.scannersForDelayedClose.add(kvScanner);
408      }
409      kvScanner = heap.poll();
410    }
411
412    return kvScanner;
413  }
414
415  /**
416   * @return the current Heap
417   */
418  public PriorityQueue<KeyValueScanner> getHeap() {
419    return this.heap;
420  }
421
422
423  @VisibleForTesting
424  KeyValueScanner getCurrentForTesting() {
425    return current;
426  }
427
428  @Override
429  public Cell getNextIndexedKey() {
430    // here we return the next index key from the top scanner
431    return current == null ? null : current.getNextIndexedKey();
432  }
433
434  @Override
435  public void shipped() throws IOException {
436    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
437      scanner.close(); // There wont be further fetch of Cells from these scanners. Just close.
438    }
439    this.scannersForDelayedClose.clear();
440    if (this.current != null) {
441      this.current.shipped();
442    }
443    if (this.heap != null) {
444      for (KeyValueScanner scanner : this.heap) {
445        scanner.shipped();
446      }
447    }
448  }
449}