001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.Set;
024import java.util.SortedSet;
025import java.util.function.IntConsumer;
026import org.apache.commons.lang3.NotImplementedException;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.ExtendedCell;
030import org.apache.hadoop.hbase.PrivateCellUtil;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
033import org.apache.yetus.audience.InterfaceAudience;
034
035/**
036 * A scanner of a single memstore segment.
037 */
038@InterfaceAudience.Private
039public class SegmentScanner implements KeyValueScanner {
040
041  // the observed structure
042  protected final Segment segment;
043  // the highest relevant MVCC
044  private long readPoint;
045  // the current iterator that can be reinitialized by
046  // seek(), backwardSeek(), or reseek()
047  protected Iterator<ExtendedCell> iter;
048  // the pre-calculated cell to be returned by peek()
049  protected ExtendedCell current = null;
050  // or next()
051  // A flag represents whether could stop skipping KeyValues for MVCC
052  // if have encountered the next row. Only used for reversed scan
053  private boolean stopSkippingKVsIfNextRow = false;
054  // Stop skipping KeyValues for MVCC if finish this row. Only used for reversed scan
055  private ExtendedCell stopSkippingKVsRow;
056  // last iterated KVs by seek (to restore the iterator state after reseek)
057  private ExtendedCell last = null;
058
059  // flag to indicate if this scanner is closed
060  protected boolean closed = false;
061  private boolean isScanMetricsEnabled = false;
062
063  /**
064   * Scanners are ordered from 0 (oldest) to newest in increasing order.
065   */
066  protected SegmentScanner(Segment segment, long readPoint) {
067    this.segment = segment;
068    this.readPoint = readPoint;
069    // increase the reference count so the underlying structure will not be de-allocated
070    this.segment.incScannerCount();
071    iter = segment.iterator();
072    // the initialization of the current is required for working with heap of SegmentScanners
073    updateCurrent();
074    // Enable scan metrics for tracking bytes read after initialization of current
075    this.isScanMetricsEnabled = ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
076    if (current == null) {
077      // nothing to fetch from this scanner
078      close();
079    }
080  }
081
082  /**
083   * Look at the next Cell in this scanner, but do not iterate the scanner
084   * @return the currently observed Cell
085   */
086  @Override
087  public ExtendedCell peek() { // sanity check, the current should be always valid
088    if (closed) {
089      return null;
090    }
091    if (current != null && current.getSequenceId() > readPoint) {
092      throw new RuntimeException("current is invalid: read point is " + readPoint + ", "
093        + "while current sequence id is " + current.getSequenceId());
094    }
095    return current;
096  }
097
098  /**
099   * Return the next Cell in this scanner, iterating the scanner
100   * @return the next Cell or null if end of scanner
101   */
102  @Override
103  public ExtendedCell next() throws IOException {
104    if (closed) {
105      return null;
106    }
107    ExtendedCell oldCurrent = current;
108    updateCurrent(); // update the currently observed Cell
109    return oldCurrent;
110  }
111
112  /**
113   * Seek the scanner at or after the specified Cell.
114   * @param cell seek value
115   * @return true if scanner has values left, false if end of scanner
116   */
117  @Override
118  public boolean seek(ExtendedCell cell) throws IOException {
119    if (closed) {
120      return false;
121    }
122    if (cell == null) {
123      close();
124      return false;
125    }
126    // restart the iterator from new key
127    iter = getIterator(cell);
128    // last is going to be reinitialized in the next getNext() call
129    last = null;
130    updateCurrent();
131    return (current != null);
132  }
133
134  protected Iterator<ExtendedCell> getIterator(ExtendedCell cell) {
135    return segment.tailSet(cell).iterator();
136  }
137
138  /**
139   * Reseek the scanner at or after the specified KeyValue. This method is guaranteed to seek at or
140   * after the required key only if the key comes after the current position of the scanner. Should
141   * not be used to seek to a key which may come before the current position.
142   * @param cell seek value (should be non-null)
143   * @return true if scanner has values left, false if end of scanner
144   */
145  @Override
146  public boolean reseek(ExtendedCell cell) throws IOException {
147    if (closed) {
148      return false;
149    }
150    /*
151     * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. This code
152     * is executed concurrently with flush and puts, without locks. The ideal implementation for
153     * performance would use the sub skip list implicitly pointed by the iterator. Unfortunately the
154     * Java API does not offer a method to get it. So we remember the last keys we iterated to and
155     * restore the reseeked set to at least that point.
156     */
157    iter = getIterator(getHighest(cell, last));
158    updateCurrent();
159    return (current != null);
160  }
161
162  /**
163   * Seek the scanner at or before the row of specified Cell, it firstly tries to seek the scanner
164   * at or after the specified Cell, return if peek KeyValue of scanner has the same row with
165   * specified Cell, otherwise seek the scanner at the first Cell of the row which is the previous
166   * row of specified KeyValue
167   * @param key seek Cell
168   * @return true if the scanner is at the valid KeyValue, false if such Cell does not exist
169   */
170  @Override
171  public boolean backwardSeek(ExtendedCell key) throws IOException {
172    if (closed) {
173      return false;
174    }
175    seek(key); // seek forward then go backward
176    if (peek() == null || segment.compareRows(peek(), key) > 0) {
177      return seekToPreviousRow(key);
178    }
179    return true;
180  }
181
182  /**
183   * Seek the scanner at the first Cell of the row which is the previous row of specified key
184   * @param cell seek value
185   * @return true if the scanner at the first valid Cell of previous row, false if not existing such
186   *         Cell
187   */
188  @Override
189  public boolean seekToPreviousRow(ExtendedCell cell) throws IOException {
190    if (closed) {
191      return false;
192    }
193    boolean keepSeeking;
194    Cell key = cell;
195    do {
196      ExtendedCell firstKeyOnRow = PrivateCellUtil.createFirstOnRow(key);
197      SortedSet<ExtendedCell> cellHead = segment.headSet(firstKeyOnRow);
198      Cell lastCellBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
199      if (lastCellBeforeRow == null) {
200        current = null;
201        return false;
202      }
203      ExtendedCell firstKeyOnPreviousRow = PrivateCellUtil.createFirstOnRow(lastCellBeforeRow);
204      this.stopSkippingKVsIfNextRow = true;
205      this.stopSkippingKVsRow = firstKeyOnPreviousRow;
206      seek(firstKeyOnPreviousRow);
207      this.stopSkippingKVsIfNextRow = false;
208      if (
209        peek() == null || segment.getComparator().compareRows(peek(), firstKeyOnPreviousRow) > 0
210      ) {
211        keepSeeking = true;
212        key = firstKeyOnPreviousRow;
213        continue;
214      } else {
215        keepSeeking = false;
216      }
217    } while (keepSeeking);
218    return true;
219  }
220
221  /**
222   * Seek the scanner at the first KeyValue of last row
223   * @return true if scanner has values left, false if the underlying data is empty
224   */
225  @Override
226  public boolean seekToLastRow() throws IOException {
227    if (closed) {
228      return false;
229    }
230    ExtendedCell higherCell = segment.isEmpty() ? null : segment.last();
231    if (higherCell == null) {
232      return false;
233    }
234
235    ExtendedCell firstCellOnLastRow = PrivateCellUtil.createFirstOnRow(higherCell);
236
237    if (seek(firstCellOnLastRow)) {
238      return true;
239    } else {
240      return seekToPreviousRow(higherCell);
241    }
242  }
243
244  /**
245   * Close the KeyValue scanner.
246   */
247  @Override
248  public void close() {
249    if (closed) {
250      return;
251    }
252    getSegment().decScannerCount();
253    closed = true;
254  }
255
256  /**
257   * This functionality should be resolved in the higher level which is MemStoreScanner, currently
258   * returns true as default. Doesn't throw IllegalStateException in order not to change the
259   * signature of the overridden method
260   */
261  @Override
262  public boolean shouldUseScanner(Scan scan, HStore store, long oldestUnexpiredTS) {
263    return getSegment().shouldSeek(scan.getColumnFamilyTimeRange().getOrDefault(
264      store.getColumnFamilyDescriptor().getName(), scan.getTimeRange()), oldestUnexpiredTS);
265  }
266
267  @Override
268  public boolean requestSeek(ExtendedCell c, boolean forward, boolean useBloom) throws IOException {
269    return NonLazyKeyValueScanner.doRealSeek(this, c, forward);
270  }
271
272  /**
273   * This scanner is working solely on the in-memory MemStore and doesn't work on store files,
274   * MutableCellSetSegmentScanner always does the seek, therefore always returning true.
275   */
276  @Override
277  public boolean realSeekDone() {
278    return true;
279  }
280
281  /**
282   * This function should be never called on scanners that always do real seek operations (i.e. most
283   * of the scanners and also this one). The easiest way to achieve this is to call
284   * {@link #realSeekDone()} first.
285   */
286  @Override
287  public void enforceSeek() throws IOException {
288    throw new NotImplementedException("enforceSeek cannot be called on a SegmentScanner");
289  }
290
291  /** Returns true if this is a file scanner. Otherwise a memory scanner is assumed. */
292  @Override
293  public boolean isFileScanner() {
294    return false;
295  }
296
297  @Override
298  public void recordBlockSize(IntConsumer blockSizeConsumer) {
299    // do nothing
300  }
301
302  @Override
303  public Path getFilePath() {
304    return null;
305  }
306
307  /**
308   * Returns the set of store file paths that were successfully read by this scanner. This
309   * implementation always returns an empty set (segment scanners do not track file paths).
310   */
311  @Override
312  public Set<Path> getFilesRead() {
313    return Collections.emptySet();
314  }
315
316  /**
317   * @return the next key in the index (the key to seek to the next block) if known, or null
318   *         otherwise Not relevant for in-memory scanner
319   */
320  @Override
321  public ExtendedCell getNextIndexedKey() {
322    return null;
323  }
324
325  /**
326   * Called after a batch of rows scanned (RPC) and set to be returned to client. Any in between
327   * cleanup can be done here. Nothing to be done for MutableCellSetSegmentScanner.
328   */
329  @Override
330  public void shipped() throws IOException {
331    // do nothing
332  }
333
334  // debug method
335  @Override
336  public String toString() {
337    String res = "Store segment scanner of type " + this.getClass().getName() + "; ";
338    res += "Scanner order " + getScannerOrder() + "; ";
339    res += getSegment().toString();
340    return res;
341  }
342
343  /********************* Private Methods **********************/
344
345  private Segment getSegment() {
346    return segment;
347  }
348
349  /**
350   * Private internal method for iterating over the segment, skipping the cells with irrelevant MVCC
351   */
352  protected void updateCurrent() {
353    ExtendedCell next = null;
354    long totalBytesRead = 0;
355
356    try {
357      while (iter.hasNext()) {
358        next = iter.next();
359        if (isScanMetricsEnabled) {
360          // Batch collect bytes to reduce method call overhead
361          totalBytesRead += Segment.getCellLength(next);
362        }
363        if (next.getSequenceId() <= this.readPoint) {
364          current = next;
365          return;// skip irrelevant versions
366        }
367        // for backwardSeek() stay in the boundaries of a single row
368        if (stopSkippingKVsIfNextRow && segment.compareRows(next, stopSkippingKVsRow) > 0) {
369          current = null;
370          return;
371        }
372      } // end of while
373
374      current = null; // nothing found
375    } finally {
376      // Add accumulated bytes before returning
377      if (totalBytesRead > 0) {
378        ThreadLocalServerSideScanMetrics.addBytesReadFromMemstore(totalBytesRead);
379      }
380      if (next != null) {
381        // in all cases, remember the last KV we iterated to, needed for reseek()
382        last = next;
383      }
384    }
385  }
386
387  /**
388   * Private internal method that returns the higher of the two key values, or null if they are both
389   * null
390   */
391  private ExtendedCell getHighest(ExtendedCell first, ExtendedCell second) {
392    if (first == null && second == null) {
393      return null;
394    }
395    if (first != null && second != null) {
396      int compare = segment.compare(first, second);
397      return (compare > 0 ? first : second);
398    }
399    return (first != null ? first : second);
400  }
401}