001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.Iterator;
022import java.util.SortedSet;
023import java.util.function.IntConsumer;
024import org.apache.commons.lang3.NotImplementedException;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.ExtendedCell;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.client.Scan;
030import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
031import org.apache.yetus.audience.InterfaceAudience;
032
033/**
034 * A scanner of a single memstore segment.
035 */
036@InterfaceAudience.Private
037public class SegmentScanner implements KeyValueScanner {
038
039  // the observed structure
040  protected final Segment segment;
041  // the highest relevant MVCC
042  private long readPoint;
043  // the current iterator that can be reinitialized by
044  // seek(), backwardSeek(), or reseek()
045  protected Iterator<ExtendedCell> iter;
046  // the pre-calculated cell to be returned by peek()
047  protected ExtendedCell current = null;
048  // or next()
049  // A flag represents whether could stop skipping KeyValues for MVCC
050  // if have encountered the next row. Only used for reversed scan
051  private boolean stopSkippingKVsIfNextRow = false;
052  // Stop skipping KeyValues for MVCC if finish this row. Only used for reversed scan
053  private ExtendedCell stopSkippingKVsRow;
054  // last iterated KVs by seek (to restore the iterator state after reseek)
055  private ExtendedCell last = null;
056
057  // flag to indicate if this scanner is closed
058  protected boolean closed = false;
059  private boolean isScanMetricsEnabled = false;
060
061  /**
062   * Scanners are ordered from 0 (oldest) to newest in increasing order.
063   */
064  protected SegmentScanner(Segment segment, long readPoint) {
065    this.segment = segment;
066    this.readPoint = readPoint;
067    // increase the reference count so the underlying structure will not be de-allocated
068    this.segment.incScannerCount();
069    iter = segment.iterator();
070    // the initialization of the current is required for working with heap of SegmentScanners
071    updateCurrent();
072    // Enable scan metrics for tracking bytes read after initialization of current
073    this.isScanMetricsEnabled = ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
074    if (current == null) {
075      // nothing to fetch from this scanner
076      close();
077    }
078  }
079
080  /**
081   * Look at the next Cell in this scanner, but do not iterate the scanner
082   * @return the currently observed Cell
083   */
084  @Override
085  public ExtendedCell peek() { // sanity check, the current should be always valid
086    if (closed) {
087      return null;
088    }
089    if (current != null && current.getSequenceId() > readPoint) {
090      throw new RuntimeException("current is invalid: read point is " + readPoint + ", "
091        + "while current sequence id is " + current.getSequenceId());
092    }
093    return current;
094  }
095
096  /**
097   * Return the next Cell in this scanner, iterating the scanner
098   * @return the next Cell or null if end of scanner
099   */
100  @Override
101  public ExtendedCell next() throws IOException {
102    if (closed) {
103      return null;
104    }
105    ExtendedCell oldCurrent = current;
106    updateCurrent(); // update the currently observed Cell
107    return oldCurrent;
108  }
109
110  /**
111   * Seek the scanner at or after the specified Cell.
112   * @param cell seek value
113   * @return true if scanner has values left, false if end of scanner
114   */
115  @Override
116  public boolean seek(ExtendedCell cell) throws IOException {
117    if (closed) {
118      return false;
119    }
120    if (cell == null) {
121      close();
122      return false;
123    }
124    // restart the iterator from new key
125    iter = getIterator(cell);
126    // last is going to be reinitialized in the next getNext() call
127    last = null;
128    updateCurrent();
129    return (current != null);
130  }
131
132  protected Iterator<ExtendedCell> getIterator(ExtendedCell cell) {
133    return segment.tailSet(cell).iterator();
134  }
135
136  /**
137   * Reseek the scanner at or after the specified KeyValue. This method is guaranteed to seek at or
138   * after the required key only if the key comes after the current position of the scanner. Should
139   * not be used to seek to a key which may come before the current position.
140   * @param cell seek value (should be non-null)
141   * @return true if scanner has values left, false if end of scanner
142   */
143  @Override
144  public boolean reseek(ExtendedCell cell) throws IOException {
145    if (closed) {
146      return false;
147    }
148    /*
149     * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. This code
150     * is executed concurrently with flush and puts, without locks. The ideal implementation for
151     * performance would use the sub skip list implicitly pointed by the iterator. Unfortunately the
152     * Java API does not offer a method to get it. So we remember the last keys we iterated to and
153     * restore the reseeked set to at least that point.
154     */
155    iter = getIterator(getHighest(cell, last));
156    updateCurrent();
157    return (current != null);
158  }
159
160  /**
161   * Seek the scanner at or before the row of specified Cell, it firstly tries to seek the scanner
162   * at or after the specified Cell, return if peek KeyValue of scanner has the same row with
163   * specified Cell, otherwise seek the scanner at the first Cell of the row which is the previous
164   * row of specified KeyValue
165   * @param key seek Cell
166   * @return true if the scanner is at the valid KeyValue, false if such Cell does not exist
167   */
168  @Override
169  public boolean backwardSeek(ExtendedCell key) throws IOException {
170    if (closed) {
171      return false;
172    }
173    seek(key); // seek forward then go backward
174    if (peek() == null || segment.compareRows(peek(), key) > 0) {
175      return seekToPreviousRow(key);
176    }
177    return true;
178  }
179
180  /**
181   * Seek the scanner at the first Cell of the row which is the previous row of specified key
182   * @param cell seek value
183   * @return true if the scanner at the first valid Cell of previous row, false if not existing such
184   *         Cell
185   */
186  @Override
187  public boolean seekToPreviousRow(ExtendedCell cell) throws IOException {
188    if (closed) {
189      return false;
190    }
191    boolean keepSeeking;
192    Cell key = cell;
193    do {
194      ExtendedCell firstKeyOnRow = PrivateCellUtil.createFirstOnRow(key);
195      SortedSet<ExtendedCell> cellHead = segment.headSet(firstKeyOnRow);
196      Cell lastCellBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
197      if (lastCellBeforeRow == null) {
198        current = null;
199        return false;
200      }
201      ExtendedCell firstKeyOnPreviousRow = PrivateCellUtil.createFirstOnRow(lastCellBeforeRow);
202      this.stopSkippingKVsIfNextRow = true;
203      this.stopSkippingKVsRow = firstKeyOnPreviousRow;
204      seek(firstKeyOnPreviousRow);
205      this.stopSkippingKVsIfNextRow = false;
206      if (
207        peek() == null || segment.getComparator().compareRows(peek(), firstKeyOnPreviousRow) > 0
208      ) {
209        keepSeeking = true;
210        key = firstKeyOnPreviousRow;
211        continue;
212      } else {
213        keepSeeking = false;
214      }
215    } while (keepSeeking);
216    return true;
217  }
218
219  /**
220   * Seek the scanner at the first KeyValue of last row
221   * @return true if scanner has values left, false if the underlying data is empty
222   */
223  @Override
224  public boolean seekToLastRow() throws IOException {
225    if (closed) {
226      return false;
227    }
228    ExtendedCell higherCell = segment.isEmpty() ? null : segment.last();
229    if (higherCell == null) {
230      return false;
231    }
232
233    ExtendedCell firstCellOnLastRow = PrivateCellUtil.createFirstOnRow(higherCell);
234
235    if (seek(firstCellOnLastRow)) {
236      return true;
237    } else {
238      return seekToPreviousRow(higherCell);
239    }
240  }
241
242  /**
243   * Close the KeyValue scanner.
244   */
245  @Override
246  public void close() {
247    if (closed) {
248      return;
249    }
250    getSegment().decScannerCount();
251    closed = true;
252  }
253
254  /**
255   * This functionality should be resolved in the higher level which is MemStoreScanner, currently
256   * returns true as default. Doesn't throw IllegalStateException in order not to change the
257   * signature of the overridden method
258   */
259  @Override
260  public boolean shouldUseScanner(Scan scan, HStore store, long oldestUnexpiredTS) {
261    return getSegment().shouldSeek(scan.getColumnFamilyTimeRange().getOrDefault(
262      store.getColumnFamilyDescriptor().getName(), scan.getTimeRange()), oldestUnexpiredTS);
263  }
264
265  @Override
266  public boolean requestSeek(ExtendedCell c, boolean forward, boolean useBloom) throws IOException {
267    return NonLazyKeyValueScanner.doRealSeek(this, c, forward);
268  }
269
270  /**
271   * This scanner is working solely on the in-memory MemStore and doesn't work on store files,
272   * MutableCellSetSegmentScanner always does the seek, therefore always returning true.
273   */
274  @Override
275  public boolean realSeekDone() {
276    return true;
277  }
278
279  /**
280   * This function should be never called on scanners that always do real seek operations (i.e. most
281   * of the scanners and also this one). The easiest way to achieve this is to call
282   * {@link #realSeekDone()} first.
283   */
284  @Override
285  public void enforceSeek() throws IOException {
286    throw new NotImplementedException("enforceSeek cannot be called on a SegmentScanner");
287  }
288
289  /** Returns true if this is a file scanner. Otherwise a memory scanner is assumed. */
290  @Override
291  public boolean isFileScanner() {
292    return false;
293  }
294
295  @Override
296  public void recordBlockSize(IntConsumer blockSizeConsumer) {
297    // do nothing
298  }
299
300  @Override
301  public Path getFilePath() {
302    return null;
303  }
304
305  /**
306   * @return the next key in the index (the key to seek to the next block) if known, or null
307   *         otherwise Not relevant for in-memory scanner
308   */
309  @Override
310  public ExtendedCell getNextIndexedKey() {
311    return null;
312  }
313
314  /**
315   * Called after a batch of rows scanned (RPC) and set to be returned to client. Any in between
316   * cleanup can be done here. Nothing to be done for MutableCellSetSegmentScanner.
317   */
318  @Override
319  public void shipped() throws IOException {
320    // do nothing
321  }
322
323  // debug method
324  @Override
325  public String toString() {
326    String res = "Store segment scanner of type " + this.getClass().getName() + "; ";
327    res += "Scanner order " + getScannerOrder() + "; ";
328    res += getSegment().toString();
329    return res;
330  }
331
332  /********************* Private Methods **********************/
333
334  private Segment getSegment() {
335    return segment;
336  }
337
338  /**
339   * Private internal method for iterating over the segment, skipping the cells with irrelevant MVCC
340   */
341  protected void updateCurrent() {
342    ExtendedCell next = null;
343    long totalBytesRead = 0;
344
345    try {
346      while (iter.hasNext()) {
347        next = iter.next();
348        if (isScanMetricsEnabled) {
349          // Batch collect bytes to reduce method call overhead
350          totalBytesRead += Segment.getCellLength(next);
351        }
352        if (next.getSequenceId() <= this.readPoint) {
353          current = next;
354          return;// skip irrelevant versions
355        }
356        // for backwardSeek() stay in the boundaries of a single row
357        if (stopSkippingKVsIfNextRow && segment.compareRows(next, stopSkippingKVsRow) > 0) {
358          current = null;
359          return;
360        }
361      } // end of while
362
363      current = null; // nothing found
364    } finally {
365      // Add accumulated bytes before returning
366      if (totalBytesRead > 0) {
367        ThreadLocalServerSideScanMetrics.addBytesReadFromMemstore(totalBytesRead);
368      }
369      if (next != null) {
370        // in all cases, remember the last KV we iterated to, needed for reseek()
371        last = next;
372      }
373    }
374  }
375
376  /**
377   * Private internal method that returns the higher of the two key values, or null if they are both
378   * null
379   */
380  private ExtendedCell getHighest(ExtendedCell first, ExtendedCell second) {
381    if (first == null && second == null) {
382      return null;
383    }
384    if (first != null && second != null) {
385      int compare = segment.compare(first, second);
386      return (compare > 0 ? first : second);
387    }
388    return (first != null ? first : second);
389  }
390}