001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.filter;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.List;
023import java.util.Objects;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellUtil;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.PrivateCellUtil;
028import org.apache.hadoop.hbase.client.ClientUtil;
029import org.apache.hadoop.hbase.exceptions.DeserializationException;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
034import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos;
037
038/**
039 * Filter to support scan multiple row key ranges. It can construct the row key ranges from the
040 * passed list which can be accessed by each region server. HBase is quite efficient when scanning
041 * only one small row key range. If user needs to specify multiple row key ranges in one scan, the
042 * typical solutions are: 1. through FilterList which is a list of row key Filters, 2. using the SQL
043 * layer over HBase to join with two table, such as hive, phoenix etc. However, both solutions are
044 * inefficient. Both of them can't utilize the range info to perform fast forwarding during scan
045 * which is quite time consuming. If the number of ranges are quite big (e.g. millions), join is a
046 * proper solution though it is slow. However, there are cases that user wants to specify a small
047 * number of ranges to scan (e.g. <1000 ranges). Both solutions can't provide satisfactory
048 * performance in such case. MultiRowRangeFilter is to support such usec ase (scan multiple row key
049 * ranges), which can construct the row key ranges from user specified list and perform
050 * fast-forwarding during scan. Thus, the scan will be quite efficient.
051 */
052@InterfaceAudience.Public
053public class MultiRowRangeFilter extends FilterBase {
054
055  private static final int ROW_BEFORE_FIRST_RANGE = -1;
056
057  private final List<RowRange> rangeList;
058  private final RangeIteration ranges;
059
060  private boolean done = false;
061  private int index;
062  private BasicRowRange range;
063  private ReturnCode currentReturnCode;
064
065  /**
066   * @param list A list of <code>RowRange</code>
067   */
068  public MultiRowRangeFilter(List<RowRange> list) {
069    // We don't use rangeList anywhere else, but keeping it lets us pay a little
070    // memory to avoid touching the serialization logic.
071    this.rangeList = Collections.unmodifiableList(sortAndMerge(list));
072    this.ranges = new RangeIteration(rangeList);
073  }
074
075  /**
076   * Constructor for creating a <code>MultiRowRangeFilter</code> from multiple rowkey prefixes. As
077   * <code>MultiRowRangeFilter</code> javadoc says (See the solution 1 of the first statement), if
078   * you try to create a filter list that scans row keys corresponding to given prefixes (e.g.,
079   * <code>FilterList</code> composed of multiple <code>PrefixFilter</code>s), this constructor
080   * provides a way to avoid creating an inefficient one.
081   * @param rowKeyPrefixes the array of byte array
082   */
083  public MultiRowRangeFilter(byte[][] rowKeyPrefixes) {
084    this(createRangeListFromRowKeyPrefixes(rowKeyPrefixes));
085  }
086
087  private static List<RowRange> createRangeListFromRowKeyPrefixes(byte[][] rowKeyPrefixes) {
088    if (rowKeyPrefixes == null) {
089      throw new IllegalArgumentException("Invalid rowkey prefixes");
090    }
091
092    List<RowRange> list = new ArrayList<>();
093    for (byte[] rowKeyPrefix : rowKeyPrefixes) {
094      byte[] stopRow = ClientUtil.calculateTheClosestNextRowKeyForPrefix(rowKeyPrefix);
095      list.add(new RowRange(rowKeyPrefix, true, stopRow, false));
096    }
097    return list;
098  }
099
100  public List<RowRange> getRowRanges() {
101    // Used by hbase-rest
102    return this.rangeList;
103  }
104
105  @Override
106  public boolean filterAllRemaining() {
107    return done;
108  }
109
110  @Override
111  public boolean filterRowKey(Cell firstRowCell) {
112    if (filterAllRemaining()) return true;
113
114    // N.b. We can only do this after we're iterating over records. If we try to do
115    // it before, the Scan (and this filter) may not yet be fully initialized. This is a
116    // wart on Filter and something that'd be nice to clean up (like CP's in HBase2.0)
117    if (!ranges.isInitialized()) {
118      ranges.initialize(isReversed());
119    }
120
121    // If it is the first time of running, calculate the current range index for
122    // the row key. If index is out of bound which happens when the start row
123    // user sets is after the largest stop row of the ranges, stop the scan.
124    // If row key is after the current range, find the next range and update index.
125    byte[] rowArr = firstRowCell.getRowArray();
126    int length = firstRowCell.getRowLength();
127    int offset = firstRowCell.getRowOffset();
128    if (!ranges.hasFoundFirstRange() || !range.contains(rowArr, offset, length)) {
129      byte[] rowkey = CellUtil.cloneRow(firstRowCell);
130      index = ranges.getNextRangeIndex(rowkey);
131      if (ranges.isIterationComplete(index)) {
132        done = true;
133        currentReturnCode = ReturnCode.NEXT_ROW;
134        return false;
135      }
136      if (index != ROW_BEFORE_FIRST_RANGE) {
137        range = ranges.get(index);
138      } else {
139        range = ranges.get(0);
140      }
141      if (ranges.isExclusive()) {
142        ranges.resetExclusive();
143        currentReturnCode = ReturnCode.NEXT_ROW;
144        return false;
145      }
146      if (!ranges.hasFoundFirstRange()) {
147        if (index != ROW_BEFORE_FIRST_RANGE) {
148          currentReturnCode = ReturnCode.INCLUDE;
149        } else {
150          currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
151        }
152        ranges.setFoundFirstRange();
153      } else {
154        if (range.contains(rowArr, offset, length)) {
155          currentReturnCode = ReturnCode.INCLUDE;
156        } else {
157          currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
158        }
159      }
160    } else {
161      currentReturnCode = ReturnCode.INCLUDE;
162    }
163    return false;
164  }
165
166  @Deprecated
167  @Override
168  public ReturnCode filterKeyValue(final Cell ignored) {
169    return filterCell(ignored);
170  }
171
172  @Override
173  public ReturnCode filterCell(final Cell ignored) {
174    return currentReturnCode;
175  }
176
177  @Override
178  public Cell getNextCellHint(Cell currentKV) {
179    // skip to the next range's start row
180    // #getComparisonData lets us avoid the `if (reversed)` branch
181    byte[] comparisonData = range.getComparisonData();
182    return PrivateCellUtil.createFirstOnRow(comparisonData, 0, (short) comparisonData.length);
183  }
184
185  /** Returns The filter serialized using pb */
186  @Override
187  public byte[] toByteArray() {
188    FilterProtos.MultiRowRangeFilter.Builder builder =
189      FilterProtos.MultiRowRangeFilter.newBuilder();
190    for (RowRange range : rangeList) {
191      if (range != null) {
192        FilterProtos.RowRange.Builder rangebuilder = FilterProtos.RowRange.newBuilder();
193        if (range.startRow != null)
194          rangebuilder.setStartRow(UnsafeByteOperations.unsafeWrap(range.startRow));
195        rangebuilder.setStartRowInclusive(range.startRowInclusive);
196        if (range.stopRow != null)
197          rangebuilder.setStopRow(UnsafeByteOperations.unsafeWrap(range.stopRow));
198        rangebuilder.setStopRowInclusive(range.stopRowInclusive);
199        builder.addRowRangeList(rangebuilder.build());
200      }
201    }
202    return builder.build().toByteArray();
203  }
204
205  /**
206   * Parse a serialized representation of {@link MultiRowRangeFilter}
207   * @param pbBytes A pb serialized instance
208   * @return An instance of {@link MultiRowRangeFilter}
209   * @throws DeserializationException if an error occurred
210   * @see #toByteArray
211   */
212  public static MultiRowRangeFilter parseFrom(final byte[] pbBytes)
213    throws DeserializationException {
214    FilterProtos.MultiRowRangeFilter proto;
215    try {
216      proto = FilterProtos.MultiRowRangeFilter.parseFrom(pbBytes);
217    } catch (InvalidProtocolBufferException e) {
218      throw new DeserializationException(e);
219    }
220    int length = proto.getRowRangeListCount();
221    List<FilterProtos.RowRange> rangeProtos = proto.getRowRangeListList();
222    List<RowRange> rangeList = new ArrayList<>(length);
223    for (FilterProtos.RowRange rangeProto : rangeProtos) {
224      RowRange range =
225        new RowRange(rangeProto.hasStartRow() ? rangeProto.getStartRow().toByteArray() : null,
226          rangeProto.getStartRowInclusive(),
227          rangeProto.hasStopRow() ? rangeProto.getStopRow().toByteArray() : null,
228          rangeProto.getStopRowInclusive());
229      rangeList.add(range);
230    }
231    return new MultiRowRangeFilter(rangeList);
232  }
233
234  /**
235   * Returns true if and only if the fields of the filter that are serialized are equal to the
236   * corresponding fields in other. Used for testing.
237   */
238  @Override
239  boolean areSerializedFieldsEqual(Filter o) {
240    if (o == this) {
241      return true;
242    }
243    if (!(o instanceof MultiRowRangeFilter)) {
244      return false;
245    }
246    MultiRowRangeFilter other = (MultiRowRangeFilter) o;
247    if (this.rangeList.size() != other.rangeList.size()) return false;
248    for (int i = 0; i < rangeList.size(); ++i) {
249      RowRange thisRange = this.rangeList.get(i);
250      RowRange otherRange = other.rangeList.get(i);
251      if (
252        !(Bytes.equals(thisRange.startRow, otherRange.startRow)
253          && Bytes.equals(thisRange.stopRow, otherRange.stopRow)
254          && (thisRange.startRowInclusive == otherRange.startRowInclusive)
255          && (thisRange.stopRowInclusive == otherRange.stopRowInclusive))
256      ) {
257        return false;
258      }
259    }
260    return true;
261  }
262
263  /**
264   * sort the ranges and if the ranges with overlap, then merge them.
265   * @param ranges the list of ranges to sort and merge.
266   * @return the ranges after sort and merge.
267   */
268  public static List<RowRange> sortAndMerge(List<RowRange> ranges) {
269    if (ranges.isEmpty()) {
270      throw new IllegalArgumentException("No ranges found.");
271    }
272    List<RowRange> invalidRanges = new ArrayList<>();
273    List<RowRange> newRanges = new ArrayList<>(ranges.size());
274    Collections.sort(ranges);
275    if (ranges.get(0).isValid()) {
276      if (ranges.size() == 1) {
277        newRanges.add(ranges.get(0));
278      }
279    } else {
280      invalidRanges.add(ranges.get(0));
281    }
282
283    byte[] lastStartRow = ranges.get(0).startRow;
284    boolean lastStartRowInclusive = ranges.get(0).startRowInclusive;
285    byte[] lastStopRow = ranges.get(0).stopRow;
286    boolean lastStopRowInclusive = ranges.get(0).stopRowInclusive;
287    int i = 1;
288    for (; i < ranges.size(); i++) {
289      RowRange range = ranges.get(i);
290      if (!range.isValid()) {
291        invalidRanges.add(range);
292      }
293      if (Bytes.equals(lastStopRow, HConstants.EMPTY_BYTE_ARRAY)) {
294        newRanges.add(
295          new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow, lastStopRowInclusive));
296        break;
297      }
298      // with overlap in the ranges
299      if (
300        (Bytes.compareTo(lastStopRow, range.startRow) > 0)
301          || (Bytes.compareTo(lastStopRow, range.startRow) == 0
302            && !(lastStopRowInclusive == false && range.isStartRowInclusive() == false))
303      ) {
304        if (Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
305          newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
306            range.stopRowInclusive));
307          break;
308        }
309        // if first range contains second range, ignore the second range
310        if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
311          if ((Bytes.compareTo(lastStopRow, range.stopRow) == 0)) {
312            if (lastStopRowInclusive == true || range.stopRowInclusive == true) {
313              lastStopRowInclusive = true;
314            }
315          }
316          if ((i + 1) == ranges.size()) {
317            newRanges.add(
318              new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow, lastStopRowInclusive));
319          }
320        } else {
321          lastStopRow = range.stopRow;
322          lastStopRowInclusive = range.stopRowInclusive;
323          if ((i + 1) < ranges.size()) {
324            i++;
325            range = ranges.get(i);
326            if (!range.isValid()) {
327              invalidRanges.add(range);
328            }
329          } else {
330            newRanges.add(
331              new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow, lastStopRowInclusive));
332            break;
333          }
334          while (
335            (Bytes.compareTo(lastStopRow, range.startRow) > 0)
336              || (Bytes.compareTo(lastStopRow, range.startRow) == 0
337                && (lastStopRowInclusive == true || range.startRowInclusive == true))
338          ) {
339            if (Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
340              break;
341            }
342            // if this first range contain second range, ignore the second range
343            if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
344              if (lastStopRowInclusive == true || range.stopRowInclusive == true) {
345                lastStopRowInclusive = true;
346              }
347              i++;
348              if (i < ranges.size()) {
349                range = ranges.get(i);
350                if (!range.isValid()) {
351                  invalidRanges.add(range);
352                }
353              } else {
354                break;
355              }
356            } else {
357              lastStopRow = range.stopRow;
358              lastStopRowInclusive = range.stopRowInclusive;
359              i++;
360              if (i < ranges.size()) {
361                range = ranges.get(i);
362                if (!range.isValid()) {
363                  invalidRanges.add(range);
364                }
365              } else {
366                break;
367              }
368            }
369          }
370          if (Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
371            if (
372              (Bytes.compareTo(lastStopRow, range.startRow) < 0)
373                || (Bytes.compareTo(lastStopRow, range.startRow) == 0
374                  && lastStopRowInclusive == false && range.startRowInclusive == false)
375            ) {
376              newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
377                lastStopRowInclusive));
378              newRanges.add(range);
379            } else {
380              newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
381                range.stopRowInclusive));
382              break;
383            }
384          }
385          newRanges.add(
386            new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow, lastStopRowInclusive));
387          if ((i + 1) == ranges.size()) {
388            newRanges.add(range);
389          }
390          lastStartRow = range.startRow;
391          lastStartRowInclusive = range.startRowInclusive;
392          lastStopRow = range.stopRow;
393          lastStopRowInclusive = range.stopRowInclusive;
394        }
395      } else {
396        newRanges.add(
397          new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow, lastStopRowInclusive));
398        if ((i + 1) == ranges.size()) {
399          newRanges.add(range);
400        }
401        lastStartRow = range.startRow;
402        lastStartRowInclusive = range.startRowInclusive;
403        lastStopRow = range.stopRow;
404        lastStopRowInclusive = range.stopRowInclusive;
405      }
406    }
407    // check the remaining ranges
408    for (int j = i; j < ranges.size(); j++) {
409      if (!ranges.get(j).isValid()) {
410        invalidRanges.add(ranges.get(j));
411      }
412    }
413    // if invalid range exists, throw the exception
414    if (invalidRanges.size() != 0) {
415      throwExceptionForInvalidRanges(invalidRanges, true);
416    }
417    // If no valid ranges found, throw the exception
418    if (newRanges.isEmpty()) {
419      throw new IllegalArgumentException("No valid ranges found.");
420    }
421    return newRanges;
422  }
423
424  private static void throwExceptionForInvalidRanges(List<RowRange> invalidRanges,
425    boolean details) {
426    StringBuilder sb = new StringBuilder();
427    sb.append(invalidRanges.size()).append(" invaild ranges.\n");
428    if (details) {
429      for (RowRange range : invalidRanges) {
430        sb.append("Invalid range: start row => " + Bytes.toString(range.startRow) + ", stop row => "
431          + Bytes.toString(range.stopRow)).append('\n');
432      }
433    }
434    throw new IllegalArgumentException(sb.toString());
435  }
436
437  private static abstract class BasicRowRange implements Comparable<BasicRowRange> {
438    protected byte[] startRow;
439    protected boolean startRowInclusive = true;
440    protected byte[] stopRow;
441    protected boolean stopRowInclusive = false;
442
443    public BasicRowRange() {
444    }
445
446    /**
447     * If the startRow is empty or null, set it to HConstants.EMPTY_BYTE_ARRAY, means begin at the
448     * start row of the table. If the stopRow is empty or null, set it to
449     * HConstants.EMPTY_BYTE_ARRAY, means end of the last row of table.
450     */
451    public BasicRowRange(String startRow, boolean startRowInclusive, String stopRow,
452      boolean stopRowInclusive) {
453      this(
454        (startRow == null || startRow.isEmpty())
455          ? HConstants.EMPTY_BYTE_ARRAY
456          : Bytes.toBytes(startRow),
457        startRowInclusive,
458        (stopRow == null || stopRow.isEmpty())
459          ? HConstants.EMPTY_BYTE_ARRAY
460          : Bytes.toBytes(stopRow),
461        stopRowInclusive);
462    }
463
464    public BasicRowRange(byte[] startRow, boolean startRowInclusive, byte[] stopRow,
465      boolean stopRowInclusive) {
466      this.startRow = (startRow == null) ? HConstants.EMPTY_BYTE_ARRAY : startRow;
467      this.startRowInclusive = startRowInclusive;
468      this.stopRow = (stopRow == null) ? HConstants.EMPTY_BYTE_ARRAY : stopRow;
469      this.stopRowInclusive = stopRowInclusive;
470    }
471
472    public byte[] getStartRow() {
473      return startRow;
474    }
475
476    public byte[] getStopRow() {
477      return stopRow;
478    }
479
480    /** Returns if start row is inclusive. */
481    public boolean isStartRowInclusive() {
482      return startRowInclusive;
483    }
484
485    /** Returns if stop row is inclusive. */
486    public boolean isStopRowInclusive() {
487      return stopRowInclusive;
488    }
489
490    public boolean contains(byte[] row) {
491      return contains(row, 0, row.length);
492    }
493
494    public boolean contains(byte[] buffer, int offset, int length) {
495      if (startRowInclusive) {
496        if (stopRowInclusive) {
497          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
498            && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
499              || Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= 0);
500        } else {
501          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
502            && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
503              || Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < 0);
504        }
505      } else {
506        if (stopRowInclusive) {
507          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
508            && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
509              || Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= 0);
510        } else {
511          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
512            && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
513              || Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < 0);
514        }
515      }
516    }
517
518    public boolean isValid() {
519      return Bytes.equals(startRow, HConstants.EMPTY_BYTE_ARRAY)
520        || Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
521        || Bytes.compareTo(startRow, stopRow) < 0
522        || (Bytes.compareTo(startRow, stopRow) == 0 && stopRowInclusive == true);
523    }
524
525    @Override
526    public boolean equals(Object obj) {
527      if (!(obj instanceof BasicRowRange)) {
528        return false;
529      }
530      if (this == obj) {
531        return true;
532      }
533      BasicRowRange rr = (BasicRowRange) obj;
534      return Bytes.equals(this.stopRow, rr.getStopRow())
535        && Bytes.equals(this.startRow, this.getStartRow())
536        && this.startRowInclusive == rr.isStartRowInclusive()
537        && this.stopRowInclusive == rr.isStopRowInclusive();
538    }
539
540    @Override
541    public int hashCode() {
542      return Objects.hash(Bytes.hashCode(this.stopRow), Bytes.hashCode(this.startRow),
543        this.startRowInclusive, this.stopRowInclusive);
544    }
545
546    /**
547     * Returns the data to be used to compare {@code this} to another object.
548     */
549    public abstract byte[] getComparisonData();
550
551    /**
552     * Returns whether the bounding row used for binary-search is inclusive or not. For forward
553     * scans, we would check the starRow, but we would check the stopRow for the reverse scan case.
554     */
555    public abstract boolean isSearchRowInclusive();
556
557    @Override
558    public int compareTo(BasicRowRange other) {
559      byte[] left;
560      byte[] right;
561      if (isAscendingOrder()) {
562        left = this.getComparisonData();
563        right = other.getComparisonData();
564      } else {
565        left = other.getComparisonData();
566        right = this.getComparisonData();
567      }
568      return Bytes.compareTo(left, right);
569    }
570
571    public abstract boolean isAscendingOrder();
572  }
573
574  /**
575   * Internal RowRange that reverses the sort-order to handle reverse scans.
576   */
577  @InterfaceAudience.Private
578  private static class ReversedRowRange extends BasicRowRange {
579    public ReversedRowRange(byte[] startRow, boolean startRowInclusive, byte[] stopRow,
580      boolean stopRowInclusive) {
581      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
582    }
583
584    @Override
585    public byte[] getComparisonData() {
586      return this.stopRow;
587    }
588
589    @Override
590    public boolean isSearchRowInclusive() {
591      return this.stopRowInclusive;
592    }
593
594    @Override
595    public boolean isAscendingOrder() {
596      return false;
597    }
598  }
599
600  @InterfaceAudience.Public
601  public static class RowRange extends BasicRowRange {
602    public RowRange() {
603    }
604
605    /**
606     * If the startRow is empty or null, set it to HConstants.EMPTY_BYTE_ARRAY, means begin at the
607     * start row of the table. If the stopRow is empty or null, set it to
608     * HConstants.EMPTY_BYTE_ARRAY, means end of the last row of table.
609     */
610    public RowRange(String startRow, boolean startRowInclusive, String stopRow,
611      boolean stopRowInclusive) {
612      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
613    }
614
615    public RowRange(byte[] startRow, boolean startRowInclusive, byte[] stopRow,
616      boolean stopRowInclusive) {
617      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
618    }
619
620    @Override
621    public byte[] getComparisonData() {
622      return startRow;
623    }
624
625    @Override
626    public boolean isSearchRowInclusive() {
627      return startRowInclusive;
628    }
629
630    @Override
631    public boolean isAscendingOrder() {
632      return true;
633    }
634  }
635
636  /**
637   * Abstraction over the ranges of rows to return from this filter, regardless of forward or
638   * reverse scans being used. This Filter can use this class, agnostic of iteration direction, as
639   * the same algorithm can be applied in both cases.
640   */
641  @InterfaceAudience.Private
642  private static class RangeIteration {
643    private boolean exclusive = false;
644    private boolean initialized = false;
645    private boolean foundFirstRange = false;
646    private boolean reversed = false;
647    private final List<RowRange> sortedAndMergedRanges;
648    private List<? extends BasicRowRange> ranges;
649
650    public RangeIteration(List<RowRange> sortedAndMergedRanges) {
651      this.sortedAndMergedRanges = sortedAndMergedRanges;
652    }
653
654    void initialize(boolean reversed) {
655      // Avoid double initialization
656      assert !this.initialized;
657      this.reversed = reversed;
658      if (reversed) {
659        // If we are doing a reverse scan, we can reverse the ranges (both the elements in
660        // the list as well as their start/stop key), and use the same algorithm.
661        this.ranges = flipAndReverseRanges(sortedAndMergedRanges);
662      } else {
663        this.ranges = sortedAndMergedRanges;
664      }
665      this.initialized = true;
666    }
667
668    /**
669     * Rebuilds the sorted ranges (by startKey) into an equivalent sorted list of ranges, only by
670     * stopKey instead. Descending order and the ReversedRowRange compareTo implementation make sure
671     * that we can use Collections.binarySearch().
672     */
673    static List<ReversedRowRange> flipAndReverseRanges(List<RowRange> ranges) {
674      List<ReversedRowRange> flippedRanges = new ArrayList<>(ranges.size());
675      for (int i = ranges.size() - 1; i >= 0; i--) {
676        RowRange origRange = ranges.get(i);
677        ReversedRowRange newRowRange = new ReversedRowRange(origRange.startRow,
678          origRange.startRowInclusive, origRange.stopRow, origRange.isStopRowInclusive());
679        flippedRanges.add(newRowRange);
680      }
681      return flippedRanges;
682    }
683
684    /**
685     * Calculates the position where the given rowkey fits in the ranges list.
686     * @param rowKey the row key to calculate
687     * @return index the position of the row key
688     */
689    public int getNextRangeIndex(byte[] rowKey) {
690      BasicRowRange temp;
691      if (reversed) {
692        temp = new ReversedRowRange(null, true, rowKey, true);
693      } else {
694        temp = new RowRange(rowKey, true, null, true);
695      }
696      // Because we make sure that `ranges` has the correct natural ordering (given it containing
697      // RowRange or ReverseRowRange objects). This keeps us from having to have two different
698      // implementations below.
699      final int index = Collections.binarySearch(ranges, temp);
700      if (index < 0) {
701        int insertionPosition = -index - 1;
702        // check if the row key in the range before the insertion position
703        if (insertionPosition != 0 && ranges.get(insertionPosition - 1).contains(rowKey)) {
704          return insertionPosition - 1;
705        }
706        // check if the row key is before the first range
707        if (insertionPosition == 0 && !ranges.get(insertionPosition).contains(rowKey)) {
708          return ROW_BEFORE_FIRST_RANGE;
709        }
710        if (!foundFirstRange) {
711          foundFirstRange = true;
712        }
713        return insertionPosition;
714      }
715      // the row key equals one of the start keys, and the the range exclude the start key
716      if (ranges.get(index).isSearchRowInclusive() == false) {
717        exclusive = true;
718      }
719      return index;
720    }
721
722    /**
723     * Sets {@link #foundFirstRange} to {@code true}, indicating that we found a matching row range.
724     */
725    public void setFoundFirstRange() {
726      this.foundFirstRange = true;
727    }
728
729    /**
730     * Gets the RowRange at the given offset.
731     */
732    @SuppressWarnings({ "unchecked", "TypeParameterUnusedInFormals" })
733    public <T extends BasicRowRange> T get(int i) {
734      return (T) ranges.get(i);
735    }
736
737    /**
738     * Returns true if the first matching row range was found.
739     */
740    public boolean hasFoundFirstRange() {
741      return foundFirstRange;
742    }
743
744    /**
745     * Returns true if the current range's key is exclusive
746     */
747    public boolean isExclusive() {
748      return exclusive;
749    }
750
751    /**
752     * Resets the exclusive flag.
753     */
754    public void resetExclusive() {
755      exclusive = false;
756    }
757
758    /**
759     * Returns true if this class has been initialized by calling {@link #initialize(boolean)}.
760     */
761    public boolean isInitialized() {
762      return initialized;
763    }
764
765    /**
766     * Returns true if we exhausted searching all row ranges.
767     */
768    public boolean isIterationComplete(int index) {
769      return index >= ranges.size();
770    }
771  }
772
773  @Override
774  public boolean equals(Object obj) {
775    return obj instanceof Filter && areSerializedFieldsEqual((Filter) obj);
776  }
777
778  @Override
779  public int hashCode() {
780    return Objects.hash(this.ranges);
781  }
782}