001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.List;
024import java.util.PriorityQueue;
025import java.util.function.IntConsumer;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Implements a heap merge across any number of KeyValueScanners.
035 * <p>
036 * Implements KeyValueScanner itself.
037 * <p>
038 * This class is used at the Region level to merge across Stores and at the Store level to merge
039 * across the memstore and StoreFiles.
040 * <p>
041 * In the Region case, we also need InternalScanner.next(List), so this class also implements
042 * InternalScanner. WARNING: As is, if you try to use this as an InternalScanner at the Store level,
043 * you will get runtime exceptions.
044 */
045@InterfaceAudience.Private
046public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
047  implements KeyValueScanner, InternalScanner {
048  private static final Logger LOG = LoggerFactory.getLogger(KeyValueHeap.class);
049  protected PriorityQueue<KeyValueScanner> heap = null;
050  // Holds the scanners when a ever a eager close() happens. All such eagerly closed
051  // scans are collected and when the final scanner.close() happens will perform the
052  // actual close.
053  protected List<KeyValueScanner> scannersForDelayedClose = null;
054
055  /**
056   * The current sub-scanner, i.e. the one that contains the next key/value to return to the client.
057   * This scanner is NOT included in {@link #heap} (but we frequently add it back to the heap and
058   * pull the new winner out). We maintain an invariant that the current sub-scanner has already
059   * done a real seek, and that current.peek() is always a real key/value (or null) except for the
060   * fake last-key-on-row-column supplied by the multi-column Bloom filter optimization, which is OK
061   * to propagate to StoreScanner. In order to ensure that, always use {@link #pollRealKV()} to
062   * update current.
063   */
064  protected KeyValueScanner current = null;
065
066  protected KVScannerComparator comparator;
067
068  /**
069   * Constructor. This KeyValueHeap will handle closing of passed in KeyValueScanners.
070   */
071  public KeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
072    throws IOException {
073    this(scanners, new KVScannerComparator(comparator));
074  }
075
076  /**
077   * Constructor.
078   */
079  KeyValueHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
080    throws IOException {
081    this.comparator = comparator;
082    this.scannersForDelayedClose = new ArrayList<>(scanners.size());
083    if (!scanners.isEmpty()) {
084      this.heap = new PriorityQueue<>(scanners.size(), this.comparator);
085      for (KeyValueScanner scanner : scanners) {
086        if (scanner.peek() != null) {
087          this.heap.add(scanner);
088        } else {
089          this.scannersForDelayedClose.add(scanner);
090        }
091      }
092      this.current = pollRealKV();
093    }
094  }
095
096  @Override
097  public Cell peek() {
098    if (this.current == null) {
099      return null;
100    }
101    return this.current.peek();
102  }
103
104  boolean isLatestCellFromMemstore() {
105    return !this.current.isFileScanner();
106  }
107
108  @Override
109  public void recordBlockSize(IntConsumer blockSizeConsumer) {
110    this.current.recordBlockSize(blockSizeConsumer);
111  }
112
113  @Override
114  public Cell next() throws IOException {
115    if (this.current == null) {
116      return null;
117    }
118    Cell kvReturn = this.current.next();
119    Cell kvNext = this.current.peek();
120    if (kvNext == null) {
121      this.scannersForDelayedClose.add(this.current);
122      this.current = null;
123      this.current = pollRealKV();
124    } else {
125      KeyValueScanner topScanner = this.heap.peek();
126      // no need to add current back to the heap if it is the only scanner left
127      if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
128        this.heap.add(this.current);
129        this.current = null;
130        this.current = pollRealKV();
131      }
132    }
133    return kvReturn;
134  }
135
136  /**
137   * Gets the next row of keys from the top-most scanner.
138   * <p>
139   * This method takes care of updating the heap.
140   * <p>
141   * This can ONLY be called when you are using Scanners that implement InternalScanner as well as
142   * KeyValueScanner (a {@link StoreScanner}).
143   * @return true if more rows exist after this one, false if scanner is done
144   */
145  @Override
146  public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
147    if (this.current == null) {
148      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
149    }
150    InternalScanner currentAsInternal = (InternalScanner) this.current;
151    boolean moreCells = currentAsInternal.next(result, scannerContext);
152    Cell pee = this.current.peek();
153
154    /*
155     * By definition, any InternalScanner must return false only when it has no further rows to be
156     * fetched. So, we can close a scanner if it returns false. All existing implementations seem to
157     * be fine with this. It is much more efficient to close scanners which are not needed than keep
158     * them in the heap. This is also required for certain optimizations.
159     */
160
161    if (pee == null || !moreCells) {
162      // add the scanner that is to be closed
163      this.scannersForDelayedClose.add(this.current);
164    } else {
165      this.heap.add(this.current);
166    }
167    this.current = null;
168    this.current = pollRealKV();
169    if (this.current == null) {
170      moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
171    }
172    return moreCells;
173  }
174
175  protected static class KVScannerComparator implements Comparator<KeyValueScanner> {
176    protected CellComparator kvComparator;
177
178    /**
179     * Constructor
180     */
181    public KVScannerComparator(CellComparator kvComparator) {
182      this.kvComparator = kvComparator;
183    }
184
185    @Override
186    public int compare(KeyValueScanner left, KeyValueScanner right) {
187      int comparison = compare(left.peek(), right.peek());
188      if (comparison != 0) {
189        return comparison;
190      } else {
191        // Since both the keys are exactly the same, we break the tie in favor of higher ordered
192        // scanner since it'll have newer data. Since higher value should come first, we reverse
193        // sort here.
194        return Long.compare(right.getScannerOrder(), left.getScannerOrder());
195      }
196    }
197
198    /**
199     * Compares two KeyValue
200     * @return less than 0 if left is smaller, 0 if equal etc..
201     */
202    public int compare(Cell left, Cell right) {
203      return this.kvComparator.compare(left, right);
204    }
205
206    /**
207     *     */
208    public CellComparator getComparator() {
209      return this.kvComparator;
210    }
211  }
212
213  @Override
214  public void close() {
215    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
216      scanner.close();
217    }
218    this.scannersForDelayedClose.clear();
219    if (this.current != null) {
220      this.current.close();
221    }
222    if (this.heap != null) {
223      // Order of closing the scanners shouldn't matter here, so simply iterate and close them.
224      for (KeyValueScanner scanner : heap) {
225        scanner.close();
226      }
227    }
228  }
229
230  /**
231   * Seeks all scanners at or below the specified seek key. If we earlied-out of a row, we may end
232   * up skipping values that were never reached yet. Rather than iterating down, we want to give the
233   * opportunity to re-seek.
234   * <p>
235   * As individual scanners may run past their ends, those scanners are automatically closed and
236   * removed from the heap.
237   * <p>
238   * This function (and {@link #reseek(Cell)}) does not do multi-column Bloom filter and lazy-seek
239   * optimizations. To enable those, call {@link #requestSeek(Cell, boolean, boolean)}.
240   * @param seekKey KeyValue to seek at or after
241   * @return true if KeyValues exist at or after specified key, false if not
242   */
243  @Override
244  public boolean seek(Cell seekKey) throws IOException {
245    return generalizedSeek(false, // This is not a lazy seek
246      seekKey, false, // forward (false: this is not a reseek)
247      false); // Not using Bloom filters
248  }
249
250  /**
251   * This function is identical to the {@link #seek(Cell)} function except that
252   * scanner.seek(seekKey) is changed to scanner.reseek(seekKey).
253   */
254  @Override
255  public boolean reseek(Cell seekKey) throws IOException {
256    return generalizedSeek(false, // This is not a lazy seek
257      seekKey, true, // forward (true because this is reseek)
258      false); // Not using Bloom filters
259  }
260
261  /**
262   * {@inheritDoc}
263   */
264  @Override
265  public boolean requestSeek(Cell key, boolean forward, boolean useBloom) throws IOException {
266    return generalizedSeek(true, key, forward, useBloom);
267  }
268
269  /**
270   * @param isLazy   whether we are trying to seek to exactly the given row/col. Enables Bloom
271   *                 filter and most-recent-file-first optimizations for multi-column get/scan
272   *                 queries.
273   * @param seekKey  key to seek to
274   * @param forward  whether to seek forward (also known as reseek)
275   * @param useBloom whether to optimize seeks using Bloom filters
276   */
277  private boolean generalizedSeek(boolean isLazy, Cell seekKey, boolean forward, boolean useBloom)
278    throws IOException {
279    if (!isLazy && useBloom) {
280      throw new IllegalArgumentException(
281        "Multi-column Bloom filter " + "optimization requires a lazy seek");
282    }
283
284    if (current == null) {
285      return false;
286    }
287
288    KeyValueScanner scanner = current;
289    try {
290      while (scanner != null) {
291        Cell topKey = scanner.peek();
292        if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
293          // Top KeyValue is at-or-after Seek KeyValue. We only know that all
294          // scanners are at or after seekKey (because fake keys of
295          // scanners where a lazy-seek operation has been done are not greater
296          // than their real next keys) but we still need to enforce our
297          // invariant that the top scanner has done a real seek. This way
298          // StoreScanner and RegionScanner do not have to worry about fake
299          // keys.
300          heap.add(scanner);
301          scanner = null;
302          current = pollRealKV();
303          return current != null;
304        }
305
306        boolean seekResult;
307        if (isLazy && heap.size() > 0) {
308          // If there is only one scanner left, we don't do lazy seek.
309          seekResult = scanner.requestSeek(seekKey, forward, useBloom);
310        } else {
311          seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey, forward);
312        }
313
314        if (!seekResult) {
315          this.scannersForDelayedClose.add(scanner);
316        } else {
317          heap.add(scanner);
318        }
319        scanner = heap.poll();
320        if (scanner == null) {
321          current = null;
322        }
323      }
324    } catch (Exception e) {
325      if (scanner != null) {
326        try {
327          scanner.close();
328        } catch (Exception ce) {
329          LOG.warn("close KeyValueScanner error", ce);
330        }
331      }
332      throw e;
333    }
334
335    // Heap is returning empty, scanner is done
336    return false;
337  }
338
339  /**
340   * Fetches the top sub-scanner from the priority queue, ensuring that a real seek has been done on
341   * it. Works by fetching the top sub-scanner, and if it has not done a real seek, making it do so
342   * (which will modify its top KV), putting it back, and repeating this until success. Relies on
343   * the fact that on a lazy seek we set the current key of a StoreFileScanner to a KV that is not
344   * greater than the real next KV to be read from that file, so the scanner that bubbles up to the
345   * top of the heap will have global next KV in this scanner heap if (1) it has done a real seek
346   * and (2) its KV is the top among all top KVs (some of which are fake) in the scanner heap.
347   */
348  protected KeyValueScanner pollRealKV() throws IOException {
349    KeyValueScanner kvScanner = heap.poll();
350    if (kvScanner == null) {
351      return null;
352    }
353
354    while (kvScanner != null && !kvScanner.realSeekDone()) {
355      if (kvScanner.peek() != null) {
356        try {
357          kvScanner.enforceSeek();
358        } catch (IOException ioe) {
359          // Add the item to delayed close set in case it is leak from close
360          this.scannersForDelayedClose.add(kvScanner);
361          throw ioe;
362        }
363        Cell curKV = kvScanner.peek();
364        if (curKV != null) {
365          KeyValueScanner nextEarliestScanner = heap.peek();
366          if (nextEarliestScanner == null) {
367            // The heap is empty. Return the only possible scanner.
368            return kvScanner;
369          }
370
371          // Compare the current scanner to the next scanner. We try to avoid
372          // putting the current one back into the heap if possible.
373          Cell nextKV = nextEarliestScanner.peek();
374          if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
375            // We already have the scanner with the earliest KV, so return it.
376            return kvScanner;
377          }
378
379          // Otherwise, put the scanner back into the heap and let it compete
380          // against all other scanners (both those that have done a "real
381          // seek" and a "lazy seek").
382          heap.add(kvScanner);
383        } else {
384          // Close the scanner because we did a real seek and found out there
385          // are no more KVs.
386          this.scannersForDelayedClose.add(kvScanner);
387        }
388      } else {
389        // Close the scanner because it has already run out of KVs even before
390        // we had to do a real seek on it.
391        this.scannersForDelayedClose.add(kvScanner);
392      }
393      kvScanner = heap.poll();
394    }
395
396    return kvScanner;
397  }
398
399  /** Returns the current Heap */
400  public PriorityQueue<KeyValueScanner> getHeap() {
401    return this.heap;
402  }
403
404  KeyValueScanner getCurrentForTesting() {
405    return current;
406  }
407
408  @Override
409  public Cell getNextIndexedKey() {
410    // here we return the next index key from the top scanner
411    return current == null ? null : current.getNextIndexedKey();
412  }
413
414  @Override
415  public void shipped() throws IOException {
416    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
417      scanner.close(); // There wont be further fetch of Cells from these scanners. Just close.
418    }
419    this.scannersForDelayedClose.clear();
420    if (this.current != null) {
421      this.current.shipped();
422    }
423    if (this.heap != null) {
424      for (KeyValueScanner scanner : this.heap) {
425        scanner.shipped();
426      }
427    }
428  }
429}