001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Comparator;
025import java.util.List;
026import java.util.PriorityQueue;
027
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Implements a heap merge across any number of KeyValueScanners.
037 * <p>
038 * Implements KeyValueScanner itself.
039 * <p>
040 * This class is used at the Region level to merge across Stores
041 * and at the Store level to merge across the memstore and StoreFiles.
042 * <p>
043 * In the Region case, we also need InternalScanner.next(List), so this class
044 * also implements InternalScanner.  WARNING: As is, if you try to use this
045 * as an InternalScanner at the Store level, you will get runtime exceptions.
046 */
047@InterfaceAudience.Private
048public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
049    implements KeyValueScanner, InternalScanner {
050  private static final Logger LOG = LoggerFactory.getLogger(KeyValueHeap.class);
051  protected PriorityQueue<KeyValueScanner> heap = null;
052  // Holds the scanners when a ever a eager close() happens.  All such eagerly closed
053  // scans are collected and when the final scanner.close() happens will perform the
054  // actual close.
055  protected List<KeyValueScanner> scannersForDelayedClose = null;
056
057  /**
058   * The current sub-scanner, i.e. the one that contains the next key/value
059   * to return to the client. This scanner is NOT included in {@link #heap}
060   * (but we frequently add it back to the heap and pull the new winner out).
061   * We maintain an invariant that the current sub-scanner has already done
062   * a real seek, and that current.peek() is always a real key/value (or null)
063   * except for the fake last-key-on-row-column supplied by the multi-column
064   * Bloom filter optimization, which is OK to propagate to StoreScanner. In
065   * order to ensure that, always use {@link #pollRealKV()} to update current.
066   */
067  protected KeyValueScanner current = null;
068
069  protected KVScannerComparator comparator;
070
071  /**
072   * Constructor.  This KeyValueHeap will handle closing of passed in
073   * KeyValueScanners.
074   * @param scanners
075   * @param comparator
076   */
077  public KeyValueHeap(List<? extends KeyValueScanner> scanners,
078      CellComparator comparator) throws IOException {
079    this(scanners, new KVScannerComparator(comparator));
080  }
081
082  /**
083   * Constructor.
084   * @param scanners
085   * @param comparator
086   * @throws IOException
087   */
088  KeyValueHeap(List<? extends KeyValueScanner> scanners,
089      KVScannerComparator comparator) throws IOException {
090    this.comparator = comparator;
091    this.scannersForDelayedClose = new ArrayList<>(scanners.size());
092    if (!scanners.isEmpty()) {
093      this.heap = new PriorityQueue<>(scanners.size(), this.comparator);
094      for (KeyValueScanner scanner : scanners) {
095        if (scanner.peek() != null) {
096          this.heap.add(scanner);
097        } else {
098          this.scannersForDelayedClose.add(scanner);
099        }
100      }
101      this.current = pollRealKV();
102    }
103  }
104
105  @Override
106  public Cell peek() {
107    if (this.current == null) {
108      return null;
109    }
110    return this.current.peek();
111  }
112
113  boolean isLatestCellFromMemstore() {
114    return !this.current.isFileScanner();
115  }
116
117  @Override
118  public Cell next()  throws IOException {
119    if(this.current == null) {
120      return null;
121    }
122    Cell kvReturn = this.current.next();
123    Cell kvNext = this.current.peek();
124    if (kvNext == null) {
125      this.scannersForDelayedClose.add(this.current);
126      this.current = null;
127      this.current = pollRealKV();
128    } else {
129      KeyValueScanner topScanner = this.heap.peek();
130      // no need to add current back to the heap if it is the only scanner left
131      if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
132        this.heap.add(this.current);
133        this.current = null;
134        this.current = pollRealKV();
135      }
136    }
137    return kvReturn;
138  }
139
140  /**
141   * Gets the next row of keys from the top-most scanner.
142   * <p>
143   * This method takes care of updating the heap.
144   * <p>
145   * This can ONLY be called when you are using Scanners that implement InternalScanner as well as
146   * KeyValueScanner (a {@link StoreScanner}).
147   * @return true if more rows exist after this one, false if scanner is done
148   */
149  @Override
150  public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
151    if (this.current == null) {
152      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
153    }
154    InternalScanner currentAsInternal = (InternalScanner)this.current;
155    boolean moreCells = currentAsInternal.next(result, scannerContext);
156    Cell pee = this.current.peek();
157
158    /*
159     * By definition, any InternalScanner must return false only when it has no
160     * further rows to be fetched. So, we can close a scanner if it returns
161     * false. All existing implementations seem to be fine with this. It is much
162     * more efficient to close scanners which are not needed than keep them in
163     * the heap. This is also required for certain optimizations.
164     */
165
166    if (pee == null || !moreCells) {
167      // add the scanner that is to be closed
168      this.scannersForDelayedClose.add(this.current);
169    } else {
170      this.heap.add(this.current);
171    }
172    this.current = null;
173    this.current = pollRealKV();
174    if (this.current == null) {
175      moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
176    }
177    return moreCells;
178  }
179
180  protected static class KVScannerComparator implements Comparator<KeyValueScanner> {
181    protected CellComparator kvComparator;
182    /**
183     * Constructor
184     * @param kvComparator
185     */
186    public KVScannerComparator(CellComparator kvComparator) {
187      this.kvComparator = kvComparator;
188    }
189
190    @Override
191    public int compare(KeyValueScanner left, KeyValueScanner right) {
192      int comparison = compare(left.peek(), right.peek());
193      if (comparison != 0) {
194        return comparison;
195      } else {
196        // Since both the keys are exactly the same, we break the tie in favor of higher ordered
197        // scanner since it'll have newer data. Since higher value should come first, we reverse
198        // sort here.
199        return Long.compare(right.getScannerOrder(), left.getScannerOrder());
200      }
201    }
202    /**
203     * Compares two KeyValue
204     * @param left
205     * @param right
206     * @return less than 0 if left is smaller, 0 if equal etc..
207     */
208    public int compare(Cell left, Cell right) {
209      return this.kvComparator.compare(left, right);
210    }
211    /**
212     * @return KVComparator
213     */
214    public CellComparator getComparator() {
215      return this.kvComparator;
216    }
217  }
218
219  @Override
220  public void close() {
221    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
222      scanner.close();
223    }
224    this.scannersForDelayedClose.clear();
225    if (this.current != null) {
226      this.current.close();
227    }
228    if (this.heap != null) {
229      // Order of closing the scanners shouldn't matter here, so simply iterate and close them.
230      for (KeyValueScanner scanner : heap) {
231        scanner.close();
232      }
233    }
234  }
235
236  /**
237   * Seeks all scanners at or below the specified seek key.  If we earlied-out
238   * of a row, we may end up skipping values that were never reached yet.
239   * Rather than iterating down, we want to give the opportunity to re-seek.
240   * <p>
241   * As individual scanners may run past their ends, those scanners are
242   * automatically closed and removed from the heap.
243   * <p>
244   * This function (and {@link #reseek(Cell)}) does not do multi-column
245   * Bloom filter and lazy-seek optimizations. To enable those, call
246   * {@link #requestSeek(Cell, boolean, boolean)}.
247   * @param seekKey KeyValue to seek at or after
248   * @return true if KeyValues exist at or after specified key, false if not
249   * @throws IOException
250   */
251  @Override
252  public boolean seek(Cell seekKey) throws IOException {
253    return generalizedSeek(false,    // This is not a lazy seek
254                           seekKey,
255                           false,    // forward (false: this is not a reseek)
256                           false);   // Not using Bloom filters
257  }
258
259  /**
260   * This function is identical to the {@link #seek(Cell)} function except
261   * that scanner.seek(seekKey) is changed to scanner.reseek(seekKey).
262   */
263  @Override
264  public boolean reseek(Cell seekKey) throws IOException {
265    return generalizedSeek(false,    // This is not a lazy seek
266                           seekKey,
267                           true,     // forward (true because this is reseek)
268                           false);   // Not using Bloom filters
269  }
270
271  /**
272   * {@inheritDoc}
273   */
274  @Override
275  public boolean requestSeek(Cell key, boolean forward,
276      boolean useBloom) throws IOException {
277    return generalizedSeek(true, key, forward, useBloom);
278  }
279
280  /**
281   * @param isLazy whether we are trying to seek to exactly the given row/col.
282   *          Enables Bloom filter and most-recent-file-first optimizations for
283   *          multi-column get/scan queries.
284   * @param seekKey key to seek to
285   * @param forward whether to seek forward (also known as reseek)
286   * @param useBloom whether to optimize seeks using Bloom filters
287   */
288  private boolean generalizedSeek(boolean isLazy, Cell seekKey,
289      boolean forward, boolean useBloom) throws IOException {
290    if (!isLazy && useBloom) {
291      throw new IllegalArgumentException("Multi-column Bloom filter " +
292          "optimization requires a lazy seek");
293    }
294
295    if (current == null) {
296      return false;
297    }
298
299    KeyValueScanner scanner = current;
300    try {
301      while (scanner != null) {
302        Cell topKey = scanner.peek();
303        if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
304          // Top KeyValue is at-or-after Seek KeyValue. We only know that all
305          // scanners are at or after seekKey (because fake keys of
306          // scanners where a lazy-seek operation has been done are not greater
307          // than their real next keys) but we still need to enforce our
308          // invariant that the top scanner has done a real seek. This way
309          // StoreScanner and RegionScanner do not have to worry about fake
310          // keys.
311          heap.add(scanner);
312          scanner = null;
313          current = pollRealKV();
314          return current != null;
315        }
316
317        boolean seekResult;
318        if (isLazy && heap.size() > 0) {
319          // If there is only one scanner left, we don't do lazy seek.
320          seekResult = scanner.requestSeek(seekKey, forward, useBloom);
321        } else {
322          seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey,
323              forward);
324        }
325
326        if (!seekResult) {
327          this.scannersForDelayedClose.add(scanner);
328        } else {
329          heap.add(scanner);
330        }
331        scanner = heap.poll();
332        if (scanner == null) {
333          current = null;
334        }
335      }
336    } catch (Exception e) {
337      if (scanner != null) {
338        try {
339          scanner.close();
340        } catch (Exception ce) {
341          LOG.warn("close KeyValueScanner error", ce);
342        }
343      }
344      throw e;
345    }
346
347    // Heap is returning empty, scanner is done
348    return false;
349  }
350
351  /**
352   * Fetches the top sub-scanner from the priority queue, ensuring that a real
353   * seek has been done on it. Works by fetching the top sub-scanner, and if it
354   * has not done a real seek, making it do so (which will modify its top KV),
355   * putting it back, and repeating this until success. Relies on the fact that
356   * on a lazy seek we set the current key of a StoreFileScanner to a KV that
357   * is not greater than the real next KV to be read from that file, so the
358   * scanner that bubbles up to the top of the heap will have global next KV in
359   * this scanner heap if (1) it has done a real seek and (2) its KV is the top
360   * among all top KVs (some of which are fake) in the scanner heap.
361   */
362  protected KeyValueScanner pollRealKV() throws IOException {
363    KeyValueScanner kvScanner = heap.poll();
364    if (kvScanner == null) {
365      return null;
366    }
367
368    while (kvScanner != null && !kvScanner.realSeekDone()) {
369      if (kvScanner.peek() != null) {
370        try {
371          kvScanner.enforceSeek();
372        } catch (IOException ioe) {
373          // Add the item to delayed close set in case it is leak from close
374          this.scannersForDelayedClose.add(kvScanner);
375          throw ioe;
376        }
377        Cell curKV = kvScanner.peek();
378        if (curKV != null) {
379          KeyValueScanner nextEarliestScanner = heap.peek();
380          if (nextEarliestScanner == null) {
381            // The heap is empty. Return the only possible scanner.
382            return kvScanner;
383          }
384
385          // Compare the current scanner to the next scanner. We try to avoid
386          // putting the current one back into the heap if possible.
387          Cell nextKV = nextEarliestScanner.peek();
388          if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
389            // We already have the scanner with the earliest KV, so return it.
390            return kvScanner;
391          }
392
393          // Otherwise, put the scanner back into the heap and let it compete
394          // against all other scanners (both those that have done a "real
395          // seek" and a "lazy seek").
396          heap.add(kvScanner);
397        } else {
398          // Close the scanner because we did a real seek and found out there
399          // are no more KVs.
400          this.scannersForDelayedClose.add(kvScanner);
401        }
402      } else {
403        // Close the scanner because it has already run out of KVs even before
404        // we had to do a real seek on it.
405        this.scannersForDelayedClose.add(kvScanner);
406      }
407      kvScanner = heap.poll();
408    }
409
410    return kvScanner;
411  }
412
413  /**
414   * @return the current Heap
415   */
416  public PriorityQueue<KeyValueScanner> getHeap() {
417    return this.heap;
418  }
419
420  KeyValueScanner getCurrentForTesting() {
421    return current;
422  }
423
424  @Override
425  public Cell getNextIndexedKey() {
426    // here we return the next index key from the top scanner
427    return current == null ? null : current.getNextIndexedKey();
428  }
429
430  @Override
431  public void shipped() throws IOException {
432    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
433      scanner.close(); // There wont be further fetch of Cells from these scanners. Just close.
434    }
435    this.scannersForDelayedClose.clear();
436    if (this.current != null) {
437      this.current.shipped();
438    }
439    if (this.heap != null) {
440      for (KeyValueScanner scanner : this.heap) {
441        scanner.shipped();
442      }
443    }
444  }
445}