001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.List;
024import java.util.PriorityQueue;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellComparator;
027import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Implements a heap merge across any number of KeyValueScanners.
034 * <p>
035 * Implements KeyValueScanner itself.
036 * <p>
037 * This class is used at the Region level to merge across Stores and at the Store level to merge
038 * across the memstore and StoreFiles.
039 * <p>
040 * In the Region case, we also need InternalScanner.next(List), so this class also implements
041 * InternalScanner. WARNING: As is, if you try to use this as an InternalScanner at the Store level,
042 * you will get runtime exceptions.
043 */
044@InterfaceAudience.Private
045public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
046  implements KeyValueScanner, InternalScanner {
047  private static final Logger LOG = LoggerFactory.getLogger(KeyValueHeap.class);
048  protected PriorityQueue<KeyValueScanner> heap = null;
049  // Holds the scanners when a ever a eager close() happens. All such eagerly closed
050  // scans are collected and when the final scanner.close() happens will perform the
051  // actual close.
052  protected List<KeyValueScanner> scannersForDelayedClose = null;
053
054  /**
055   * The current sub-scanner, i.e. the one that contains the next key/value to return to the client.
056   * This scanner is NOT included in {@link #heap} (but we frequently add it back to the heap and
057   * pull the new winner out). We maintain an invariant that the current sub-scanner has already
058   * done a real seek, and that current.peek() is always a real key/value (or null) except for the
059   * fake last-key-on-row-column supplied by the multi-column Bloom filter optimization, which is OK
060   * to propagate to StoreScanner. In order to ensure that, always use {@link #pollRealKV()} to
061   * update current.
062   */
063  protected KeyValueScanner current = null;
064
065  protected KVScannerComparator comparator;
066
067  /**
068   * Constructor. This KeyValueHeap will handle closing of passed in KeyValueScanners. nn
069   */
070  public KeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
071    throws IOException {
072    this(scanners, new KVScannerComparator(comparator));
073  }
074
075  /**
076   * Constructor. nnn
077   */
078  KeyValueHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
079    throws IOException {
080    this.comparator = comparator;
081    this.scannersForDelayedClose = new ArrayList<>(scanners.size());
082    if (!scanners.isEmpty()) {
083      this.heap = new PriorityQueue<>(scanners.size(), this.comparator);
084      for (KeyValueScanner scanner : scanners) {
085        if (scanner.peek() != null) {
086          this.heap.add(scanner);
087        } else {
088          this.scannersForDelayedClose.add(scanner);
089        }
090      }
091      this.current = pollRealKV();
092    }
093  }
094
095  @Override
096  public Cell peek() {
097    if (this.current == null) {
098      return null;
099    }
100    return this.current.peek();
101  }
102
103  boolean isLatestCellFromMemstore() {
104    return !this.current.isFileScanner();
105  }
106
107  @Override
108  public Cell next() throws IOException {
109    if (this.current == null) {
110      return null;
111    }
112    Cell kvReturn = this.current.next();
113    Cell kvNext = this.current.peek();
114    if (kvNext == null) {
115      this.scannersForDelayedClose.add(this.current);
116      this.current = null;
117      this.current = pollRealKV();
118    } else {
119      KeyValueScanner topScanner = this.heap.peek();
120      // no need to add current back to the heap if it is the only scanner left
121      if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
122        this.heap.add(this.current);
123        this.current = null;
124        this.current = pollRealKV();
125      }
126    }
127    return kvReturn;
128  }
129
130  /**
131   * Gets the next row of keys from the top-most scanner.
132   * <p>
133   * This method takes care of updating the heap.
134   * <p>
135   * This can ONLY be called when you are using Scanners that implement InternalScanner as well as
136   * KeyValueScanner (a {@link StoreScanner}).
137   * @return true if more rows exist after this one, false if scanner is done
138   */
139  @Override
140  public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
141    if (this.current == null) {
142      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
143    }
144    InternalScanner currentAsInternal = (InternalScanner) this.current;
145    boolean moreCells = currentAsInternal.next(result, scannerContext);
146    Cell pee = this.current.peek();
147
148    /*
149     * By definition, any InternalScanner must return false only when it has no further rows to be
150     * fetched. So, we can close a scanner if it returns false. All existing implementations seem to
151     * be fine with this. It is much more efficient to close scanners which are not needed than keep
152     * them in the heap. This is also required for certain optimizations.
153     */
154
155    if (pee == null || !moreCells) {
156      // add the scanner that is to be closed
157      this.scannersForDelayedClose.add(this.current);
158    } else {
159      this.heap.add(this.current);
160    }
161    this.current = null;
162    this.current = pollRealKV();
163    if (this.current == null) {
164      moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
165    }
166    return moreCells;
167  }
168
169  protected static class KVScannerComparator implements Comparator<KeyValueScanner> {
170    protected CellComparator kvComparator;
171
172    /**
173     * Constructor n
174     */
175    public KVScannerComparator(CellComparator kvComparator) {
176      this.kvComparator = kvComparator;
177    }
178
179    @Override
180    public int compare(KeyValueScanner left, KeyValueScanner right) {
181      int comparison = compare(left.peek(), right.peek());
182      if (comparison != 0) {
183        return comparison;
184      } else {
185        // Since both the keys are exactly the same, we break the tie in favor of higher ordered
186        // scanner since it'll have newer data. Since higher value should come first, we reverse
187        // sort here.
188        return Long.compare(right.getScannerOrder(), left.getScannerOrder());
189      }
190    }
191
192    /**
193     * Compares two KeyValue nn * @return less than 0 if left is smaller, 0 if equal etc..
194     */
195    public int compare(Cell left, Cell right) {
196      return this.kvComparator.compare(left, right);
197    }
198
199    /**
200     * n
201     */
202    public CellComparator getComparator() {
203      return this.kvComparator;
204    }
205  }
206
207  @Override
208  public void close() {
209    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
210      scanner.close();
211    }
212    this.scannersForDelayedClose.clear();
213    if (this.current != null) {
214      this.current.close();
215    }
216    if (this.heap != null) {
217      // Order of closing the scanners shouldn't matter here, so simply iterate and close them.
218      for (KeyValueScanner scanner : heap) {
219        scanner.close();
220      }
221    }
222  }
223
224  /**
225   * Seeks all scanners at or below the specified seek key. If we earlied-out of a row, we may end
226   * up skipping values that were never reached yet. Rather than iterating down, we want to give the
227   * opportunity to re-seek.
228   * <p>
229   * As individual scanners may run past their ends, those scanners are automatically closed and
230   * removed from the heap.
231   * <p>
232   * This function (and {@link #reseek(Cell)}) does not do multi-column Bloom filter and lazy-seek
233   * optimizations. To enable those, call {@link #requestSeek(Cell, boolean, boolean)}.
234   * @param seekKey KeyValue to seek at or after
235   * @return true if KeyValues exist at or after specified key, false if not n
236   */
237  @Override
238  public boolean seek(Cell seekKey) throws IOException {
239    return generalizedSeek(false, // This is not a lazy seek
240      seekKey, false, // forward (false: this is not a reseek)
241      false); // Not using Bloom filters
242  }
243
244  /**
245   * This function is identical to the {@link #seek(Cell)} function except that
246   * scanner.seek(seekKey) is changed to scanner.reseek(seekKey).
247   */
248  @Override
249  public boolean reseek(Cell seekKey) throws IOException {
250    return generalizedSeek(false, // This is not a lazy seek
251      seekKey, true, // forward (true because this is reseek)
252      false); // Not using Bloom filters
253  }
254
255  /**
256   * {@inheritDoc}
257   */
258  @Override
259  public boolean requestSeek(Cell key, boolean forward, boolean useBloom) throws IOException {
260    return generalizedSeek(true, key, forward, useBloom);
261  }
262
263  /**
264   * @param isLazy   whether we are trying to seek to exactly the given row/col. Enables Bloom
265   *                 filter and most-recent-file-first optimizations for multi-column get/scan
266   *                 queries.
267   * @param seekKey  key to seek to
268   * @param forward  whether to seek forward (also known as reseek)
269   * @param useBloom whether to optimize seeks using Bloom filters
270   */
271  private boolean generalizedSeek(boolean isLazy, Cell seekKey, boolean forward, boolean useBloom)
272    throws IOException {
273    if (!isLazy && useBloom) {
274      throw new IllegalArgumentException(
275        "Multi-column Bloom filter " + "optimization requires a lazy seek");
276    }
277
278    if (current == null) {
279      return false;
280    }
281
282    KeyValueScanner scanner = current;
283    try {
284      while (scanner != null) {
285        Cell topKey = scanner.peek();
286        if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
287          // Top KeyValue is at-or-after Seek KeyValue. We only know that all
288          // scanners are at or after seekKey (because fake keys of
289          // scanners where a lazy-seek operation has been done are not greater
290          // than their real next keys) but we still need to enforce our
291          // invariant that the top scanner has done a real seek. This way
292          // StoreScanner and RegionScanner do not have to worry about fake
293          // keys.
294          heap.add(scanner);
295          scanner = null;
296          current = pollRealKV();
297          return current != null;
298        }
299
300        boolean seekResult;
301        if (isLazy && heap.size() > 0) {
302          // If there is only one scanner left, we don't do lazy seek.
303          seekResult = scanner.requestSeek(seekKey, forward, useBloom);
304        } else {
305          seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey, forward);
306        }
307
308        if (!seekResult) {
309          this.scannersForDelayedClose.add(scanner);
310        } else {
311          heap.add(scanner);
312        }
313        scanner = heap.poll();
314        if (scanner == null) {
315          current = null;
316        }
317      }
318    } catch (Exception e) {
319      if (scanner != null) {
320        try {
321          scanner.close();
322        } catch (Exception ce) {
323          LOG.warn("close KeyValueScanner error", ce);
324        }
325      }
326      throw e;
327    }
328
329    // Heap is returning empty, scanner is done
330    return false;
331  }
332
333  /**
334   * Fetches the top sub-scanner from the priority queue, ensuring that a real seek has been done on
335   * it. Works by fetching the top sub-scanner, and if it has not done a real seek, making it do so
336   * (which will modify its top KV), putting it back, and repeating this until success. Relies on
337   * the fact that on a lazy seek we set the current key of a StoreFileScanner to a KV that is not
338   * greater than the real next KV to be read from that file, so the scanner that bubbles up to the
339   * top of the heap will have global next KV in this scanner heap if (1) it has done a real seek
340   * and (2) its KV is the top among all top KVs (some of which are fake) in the scanner heap.
341   */
342  protected KeyValueScanner pollRealKV() throws IOException {
343    KeyValueScanner kvScanner = heap.poll();
344    if (kvScanner == null) {
345      return null;
346    }
347
348    while (kvScanner != null && !kvScanner.realSeekDone()) {
349      if (kvScanner.peek() != null) {
350        try {
351          kvScanner.enforceSeek();
352        } catch (IOException ioe) {
353          // Add the item to delayed close set in case it is leak from close
354          this.scannersForDelayedClose.add(kvScanner);
355          throw ioe;
356        }
357        Cell curKV = kvScanner.peek();
358        if (curKV != null) {
359          KeyValueScanner nextEarliestScanner = heap.peek();
360          if (nextEarliestScanner == null) {
361            // The heap is empty. Return the only possible scanner.
362            return kvScanner;
363          }
364
365          // Compare the current scanner to the next scanner. We try to avoid
366          // putting the current one back into the heap if possible.
367          Cell nextKV = nextEarliestScanner.peek();
368          if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
369            // We already have the scanner with the earliest KV, so return it.
370            return kvScanner;
371          }
372
373          // Otherwise, put the scanner back into the heap and let it compete
374          // against all other scanners (both those that have done a "real
375          // seek" and a "lazy seek").
376          heap.add(kvScanner);
377        } else {
378          // Close the scanner because we did a real seek and found out there
379          // are no more KVs.
380          this.scannersForDelayedClose.add(kvScanner);
381        }
382      } else {
383        // Close the scanner because it has already run out of KVs even before
384        // we had to do a real seek on it.
385        this.scannersForDelayedClose.add(kvScanner);
386      }
387      kvScanner = heap.poll();
388    }
389
390    return kvScanner;
391  }
392
393  /** Returns the current Heap */
394  public PriorityQueue<KeyValueScanner> getHeap() {
395    return this.heap;
396  }
397
398  KeyValueScanner getCurrentForTesting() {
399    return current;
400  }
401
402  @Override
403  public Cell getNextIndexedKey() {
404    // here we return the next index key from the top scanner
405    return current == null ? null : current.getNextIndexedKey();
406  }
407
408  @Override
409  public void shipped() throws IOException {
410    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
411      scanner.close(); // There wont be further fetch of Cells from these scanners. Just close.
412    }
413    this.scannersForDelayedClose.clear();
414    if (this.current != null) {
415      this.current.shipped();
416    }
417    if (this.heap != null) {
418      for (KeyValueScanner scanner : this.heap) {
419        scanner.shipped();
420      }
421    }
422  }
423}