001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.HashSet;
025import java.util.List;
026import java.util.PriorityQueue;
027import java.util.Set;
028import java.util.function.IntConsumer;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellComparator;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Implements a heap merge across any number of KeyValueScanners.
040 * <p>
041 * Implements KeyValueScanner itself.
042 * <p>
043 * This class is used at the Region level to merge across Stores and at the Store level to merge
044 * across the memstore and StoreFiles.
045 * <p>
046 * In the Region case, we also need InternalScanner.next(List), so this class also implements
047 * InternalScanner. WARNING: As is, if you try to use this as an InternalScanner at the Store level,
048 * you will get runtime exceptions.
049 */
050@InterfaceAudience.Private
051public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
052  implements KeyValueScanner, InternalScanner {
053  private static final Logger LOG = LoggerFactory.getLogger(KeyValueHeap.class);
054  protected PriorityQueue<KeyValueScanner> heap = null;
055  // Holds the scanners when a ever a eager close() happens. All such eagerly closed
056  // scans are collected and when the final scanner.close() happens will perform the
057  // actual close.
058  protected List<KeyValueScanner> scannersForDelayedClose = null;
059
060  /**
061   * The current sub-scanner, i.e. the one that contains the next key/value to return to the client.
062   * This scanner is NOT included in {@link #heap} (but we frequently add it back to the heap and
063   * pull the new winner out). We maintain an invariant that the current sub-scanner has already
064   * done a real seek, and that current.peek() is always a real key/value (or null) except for the
065   * fake last-key-on-row-column supplied by the multi-column Bloom filter optimization, which is OK
066   * to propagate to StoreScanner. In order to ensure that, always use {@link #pollRealKV()} to
067   * update current.
068   */
069  protected KeyValueScanner current = null;
070
071  protected KVScannerComparator comparator;
072
073  private final Set<Path> filesRead = new HashSet<>();
074
075  /**
076   * Constructor. This KeyValueHeap will handle closing of passed in KeyValueScanners.
077   */
078  public KeyValueHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
079    throws IOException {
080    this(scanners, new KVScannerComparator(comparator));
081  }
082
083  /**
084   * Constructor.
085   */
086  KeyValueHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
087    throws IOException {
088    this.comparator = comparator;
089    this.scannersForDelayedClose = new ArrayList<>(scanners.size());
090    if (!scanners.isEmpty()) {
091      this.heap = new PriorityQueue<>(scanners.size(), this.comparator);
092      for (KeyValueScanner scanner : scanners) {
093        if (scanner.peek() != null) {
094          this.heap.add(scanner);
095        } else {
096          this.scannersForDelayedClose.add(scanner);
097        }
098      }
099      this.current = pollRealKV();
100    }
101  }
102
103  @Override
104  public ExtendedCell peek() {
105    if (this.current == null) {
106      return null;
107    }
108    return this.current.peek();
109  }
110
111  boolean isLatestCellFromMemstore() {
112    return !this.current.isFileScanner();
113  }
114
115  @Override
116  public void recordBlockSize(IntConsumer blockSizeConsumer) {
117    this.current.recordBlockSize(blockSizeConsumer);
118  }
119
120  @Override
121  public ExtendedCell next() throws IOException {
122    if (this.current == null) {
123      return null;
124    }
125    ExtendedCell kvReturn = this.current.next();
126    ExtendedCell kvNext = this.current.peek();
127    if (kvNext == null) {
128      this.scannersForDelayedClose.add(this.current);
129      this.current = null;
130      this.current = pollRealKV();
131    } else {
132      KeyValueScanner topScanner = this.heap.peek();
133      // no need to add current back to the heap if it is the only scanner left
134      if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
135        this.heap.add(this.current);
136        this.current = null;
137        this.current = pollRealKV();
138      }
139    }
140    return kvReturn;
141  }
142
143  /**
144   * Gets the next row of keys from the top-most scanner.
145   * <p>
146   * This method takes care of updating the heap.
147   * <p>
148   * This can ONLY be called when you are using Scanners that implement InternalScanner as well as
149   * KeyValueScanner (a {@link StoreScanner}).
150   * @return true if more rows exist after this one, false if scanner is done
151   */
152  @Override
153  public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
154    throws IOException {
155    if (this.current == null) {
156      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
157    }
158    InternalScanner currentAsInternal = (InternalScanner) this.current;
159    boolean moreCells = currentAsInternal.next(result, scannerContext);
160    Cell pee = this.current.peek();
161
162    /*
163     * By definition, any InternalScanner must return false only when it has no further rows to be
164     * fetched. So, we can close a scanner if it returns false. All existing implementations seem to
165     * be fine with this. It is much more efficient to close scanners which are not needed than keep
166     * them in the heap. This is also required for certain optimizations.
167     */
168
169    if (pee == null || !moreCells) {
170      // add the scanner that is to be closed
171      this.scannersForDelayedClose.add(this.current);
172    } else {
173      this.heap.add(this.current);
174    }
175    this.current = null;
176    this.current = pollRealKV();
177    if (this.current == null) {
178      moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
179    }
180    return moreCells;
181  }
182
183  protected static class KVScannerComparator implements Comparator<KeyValueScanner> {
184    protected CellComparator kvComparator;
185
186    /**
187     * Constructor
188     */
189    public KVScannerComparator(CellComparator kvComparator) {
190      this.kvComparator = kvComparator;
191    }
192
193    @Override
194    public int compare(KeyValueScanner left, KeyValueScanner right) {
195      int comparison = compare(left.peek(), right.peek());
196      if (comparison != 0) {
197        return comparison;
198      } else {
199        // Since both the keys are exactly the same, we break the tie in favor of higher ordered
200        // scanner since it'll have newer data. Since higher value should come first, we reverse
201        // sort here.
202        return Long.compare(right.getScannerOrder(), left.getScannerOrder());
203      }
204    }
205
206    /**
207     * Compares two KeyValue
208     * @return less than 0 if left is smaller, 0 if equal etc..
209     */
210    public int compare(Cell left, Cell right) {
211      return this.kvComparator.compare(left, right);
212    }
213
214    /**
215     *     */
216    public CellComparator getComparator() {
217      return this.kvComparator;
218    }
219  }
220
221  @Override
222  public void close() {
223    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
224      scanner.close();
225      filesRead.addAll(scanner.getFilesRead());
226    }
227    this.scannersForDelayedClose.clear();
228    if (this.current != null) {
229      this.current.close();
230      filesRead.addAll(this.current.getFilesRead());
231    }
232    if (this.heap != null) {
233      // Order of closing the scanners shouldn't matter here, so simply iterate and close them.
234      for (KeyValueScanner scanner : heap) {
235        scanner.close();
236        filesRead.addAll(scanner.getFilesRead());
237      }
238    }
239  }
240
241  /**
242   * Returns the set of store file paths successfully read by the scanners in this heap. Populated
243   * as each scanner is closed (e.g. in close() or shipped()).
244   */
245  @Override
246  public Set<Path> getFilesRead() {
247    return Collections.unmodifiableSet(filesRead);
248  }
249
250  /**
251   * Seeks all scanners at or below the specified seek key. If we earlied-out of a row, we may end
252   * up skipping values that were never reached yet. Rather than iterating down, we want to give the
253   * opportunity to re-seek.
254   * <p>
255   * As individual scanners may run past their ends, those scanners are automatically closed and
256   * removed from the heap.
257   * <p>
258   * This function (and {@link #reseek(ExtendedCell)}) does not do multi-column Bloom filter and
259   * lazy-seek optimizations. To enable those, call
260   * {@link #requestSeek(ExtendedCell, boolean, boolean)}.
261   * @param seekKey KeyValue to seek at or after
262   * @return true if KeyValues exist at or after specified key, false if not
263   */
264  @Override
265  public boolean seek(ExtendedCell seekKey) throws IOException {
266    return generalizedSeek(false, // This is not a lazy seek
267      seekKey, false, // forward (false: this is not a reseek)
268      false); // Not using Bloom filters
269  }
270
271  /**
272   * This function is identical to the {@link #seek(ExtendedCell)} function except that
273   * scanner.seek(seekKey) is changed to scanner.reseek(seekKey).
274   */
275  @Override
276  public boolean reseek(ExtendedCell seekKey) throws IOException {
277    return generalizedSeek(false, // This is not a lazy seek
278      seekKey, true, // forward (true because this is reseek)
279      false); // Not using Bloom filters
280  }
281
282  /**
283   * {@inheritDoc}
284   */
285  @Override
286  public boolean requestSeek(ExtendedCell key, boolean forward, boolean useBloom)
287    throws IOException {
288    return generalizedSeek(true, key, forward, useBloom);
289  }
290
291  /**
292   * @param isLazy   whether we are trying to seek to exactly the given row/col. Enables Bloom
293   *                 filter and most-recent-file-first optimizations for multi-column get/scan
294   *                 queries.
295   * @param seekKey  key to seek to
296   * @param forward  whether to seek forward (also known as reseek)
297   * @param useBloom whether to optimize seeks using Bloom filters
298   */
299  private boolean generalizedSeek(boolean isLazy, ExtendedCell seekKey, boolean forward,
300    boolean useBloom) throws IOException {
301    if (!isLazy && useBloom) {
302      throw new IllegalArgumentException(
303        "Multi-column Bloom filter " + "optimization requires a lazy seek");
304    }
305
306    if (current == null) {
307      return false;
308    }
309
310    KeyValueScanner scanner = current;
311    try {
312      while (scanner != null) {
313        Cell topKey = scanner.peek();
314        if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
315          // Top KeyValue is at-or-after Seek KeyValue. We only know that all
316          // scanners are at or after seekKey (because fake keys of
317          // scanners where a lazy-seek operation has been done are not greater
318          // than their real next keys) but we still need to enforce our
319          // invariant that the top scanner has done a real seek. This way
320          // StoreScanner and RegionScanner do not have to worry about fake
321          // keys.
322          heap.add(scanner);
323          scanner = null;
324          current = pollRealKV();
325          return current != null;
326        }
327
328        boolean seekResult;
329        if (isLazy && heap.size() > 0) {
330          // If there is only one scanner left, we don't do lazy seek.
331          seekResult = scanner.requestSeek(seekKey, forward, useBloom);
332        } else {
333          seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey, forward);
334        }
335
336        if (!seekResult) {
337          this.scannersForDelayedClose.add(scanner);
338        } else {
339          heap.add(scanner);
340        }
341        scanner = heap.poll();
342        if (scanner == null) {
343          current = null;
344        }
345      }
346    } catch (Exception e) {
347      if (scanner != null) {
348        try {
349          scanner.close();
350        } catch (Exception ce) {
351          LOG.warn("close KeyValueScanner error", ce);
352        }
353      }
354      throw e;
355    }
356
357    // Heap is returning empty, scanner is done
358    return false;
359  }
360
361  /**
362   * Fetches the top sub-scanner from the priority queue, ensuring that a real seek has been done on
363   * it. Works by fetching the top sub-scanner, and if it has not done a real seek, making it do so
364   * (which will modify its top KV), putting it back, and repeating this until success. Relies on
365   * the fact that on a lazy seek we set the current key of a StoreFileScanner to a KV that is not
366   * greater than the real next KV to be read from that file, so the scanner that bubbles up to the
367   * top of the heap will have global next KV in this scanner heap if (1) it has done a real seek
368   * and (2) its KV is the top among all top KVs (some of which are fake) in the scanner heap.
369   */
370  protected KeyValueScanner pollRealKV() throws IOException {
371    KeyValueScanner kvScanner = heap.poll();
372    if (kvScanner == null) {
373      return null;
374    }
375
376    while (kvScanner != null && !kvScanner.realSeekDone()) {
377      if (kvScanner.peek() != null) {
378        try {
379          kvScanner.enforceSeek();
380        } catch (IOException ioe) {
381          // Add the item to delayed close set in case it is leak from close
382          this.scannersForDelayedClose.add(kvScanner);
383          throw ioe;
384        }
385        Cell curKV = kvScanner.peek();
386        if (curKV != null) {
387          KeyValueScanner nextEarliestScanner = heap.peek();
388          if (nextEarliestScanner == null) {
389            // The heap is empty. Return the only possible scanner.
390            return kvScanner;
391          }
392
393          // Compare the current scanner to the next scanner. We try to avoid
394          // putting the current one back into the heap if possible.
395          Cell nextKV = nextEarliestScanner.peek();
396          if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
397            // We already have the scanner with the earliest KV, so return it.
398            return kvScanner;
399          }
400
401          // Otherwise, put the scanner back into the heap and let it compete
402          // against all other scanners (both those that have done a "real
403          // seek" and a "lazy seek").
404          heap.add(kvScanner);
405        } else {
406          // Close the scanner because we did a real seek and found out there
407          // are no more KVs.
408          this.scannersForDelayedClose.add(kvScanner);
409        }
410      } else {
411        // Close the scanner because it has already run out of KVs even before
412        // we had to do a real seek on it.
413        this.scannersForDelayedClose.add(kvScanner);
414      }
415      kvScanner = heap.poll();
416    }
417
418    return kvScanner;
419  }
420
421  /** Returns the current Heap */
422  public PriorityQueue<KeyValueScanner> getHeap() {
423    return this.heap;
424  }
425
426  KeyValueScanner getCurrentForTesting() {
427    return current;
428  }
429
430  @Override
431  public ExtendedCell getNextIndexedKey() {
432    // here we return the next index key from the top scanner
433    return current == null ? null : current.getNextIndexedKey();
434  }
435
436  @Override
437  public void shipped() throws IOException {
438    for (KeyValueScanner scanner : this.scannersForDelayedClose) {
439      scanner.close(); // There wont be further fetch of Cells from these scanners. Just close.
440      filesRead.addAll(scanner.getFilesRead());
441    }
442    this.scannersForDelayedClose.clear();
443    if (this.current != null) {
444      this.current.shipped();
445    }
446    if (this.heap != null) {
447      for (KeyValueScanner scanner : this.heap) {
448        scanner.shipped();
449      }
450    }
451  }
452}