001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.filter;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.List;
023import java.util.Objects;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellUtil;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.PrivateCellUtil;
028import org.apache.hadoop.hbase.client.ClientUtil;
029import org.apache.hadoop.hbase.exceptions.DeserializationException;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
034import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
035
036import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos;
037
038/**
039 * Filter to support scan multiple row key ranges. It can construct the row key ranges from the
040 * passed list which can be accessed by each region server. HBase is quite efficient when scanning
041 * only one small row key range. If user needs to specify multiple row key ranges in one scan, the
042 * typical solutions are: 1. through FilterList which is a list of row key Filters, 2. using the SQL
043 * layer over HBase to join with two table, such as hive, phoenix etc. However, both solutions are
044 * inefficient. Both of them can't utilize the range info to perform fast forwarding during scan
045 * which is quite time consuming. If the number of ranges are quite big (e.g. millions), join is a
046 * proper solution though it is slow. However, there are cases that user wants to specify a small
047 * number of ranges to scan (e.g. <1000 ranges). Both solutions can't provide satisfactory
048 * performance in such case. MultiRowRangeFilter is to support such usec ase (scan multiple row key
049 * ranges), which can construct the row key ranges from user specified list and perform
050 * fast-forwarding during scan. Thus, the scan will be quite efficient.
051 */
052@InterfaceAudience.Public
053public class MultiRowRangeFilter extends FilterBase {
054
055  private static final int ROW_BEFORE_FIRST_RANGE = -1;
056
057  private final List<RowRange> rangeList;
058  private final RangeIteration ranges;
059
060  private boolean done = false;
061  private int index;
062  private BasicRowRange range;
063  private ReturnCode currentReturnCode;
064
065  /**
066   * @param list A list of <code>RowRange</code>
067   */
068  public MultiRowRangeFilter(List<RowRange> list) {
069    // We don't use rangeList anywhere else, but keeping it lets us pay a little
070    // memory to avoid touching the serialization logic.
071    this.rangeList = Collections.unmodifiableList(sortAndMerge(list));
072    this.ranges = new RangeIteration(rangeList);
073  }
074
075  /**
076   * Constructor for creating a <code>MultiRowRangeFilter</code> from multiple rowkey prefixes. As
077   * <code>MultiRowRangeFilter</code> javadoc says (See the solution 1 of the first statement), if
078   * you try to create a filter list that scans row keys corresponding to given prefixes (e.g.,
079   * <code>FilterList</code> composed of multiple <code>PrefixFilter</code>s), this constructor
080   * provides a way to avoid creating an inefficient one.
081   * @param rowKeyPrefixes the array of byte array
082   */
083  public MultiRowRangeFilter(byte[][] rowKeyPrefixes) {
084    this(createRangeListFromRowKeyPrefixes(rowKeyPrefixes));
085  }
086
087  private static List<RowRange> createRangeListFromRowKeyPrefixes(byte[][] rowKeyPrefixes) {
088    if (rowKeyPrefixes == null) {
089      throw new IllegalArgumentException("Invalid rowkey prefixes");
090    }
091
092    List<RowRange> list = new ArrayList<>();
093    for (byte[] rowKeyPrefix : rowKeyPrefixes) {
094      byte[] stopRow = ClientUtil.calculateTheClosestNextRowKeyForPrefix(rowKeyPrefix);
095      list.add(new RowRange(rowKeyPrefix, true, stopRow, false));
096    }
097    return list;
098  }
099
100  public List<RowRange> getRowRanges() {
101    // Used by hbase-rest
102    return this.rangeList;
103  }
104
105  @Override
106  public boolean filterAllRemaining() {
107    return done;
108  }
109
110  @Override
111  public boolean filterRowKey(Cell firstRowCell) {
112    if (filterAllRemaining()) return true;
113
114    // N.b. We can only do this after we're iterating over records. If we try to do
115    // it before, the Scan (and this filter) may not yet be fully initialized. This is a
116    // wart on Filter and something that'd be nice to clean up (like CP's in HBase2.0)
117    if (!ranges.isInitialized()) {
118      ranges.initialize(isReversed());
119    }
120
121    // If it is the first time of running, calculate the current range index for
122    // the row key. If index is out of bound which happens when the start row
123    // user sets is after the largest stop row of the ranges, stop the scan.
124    // If row key is after the current range, find the next range and update index.
125    byte[] rowArr = firstRowCell.getRowArray();
126    int length = firstRowCell.getRowLength();
127    int offset = firstRowCell.getRowOffset();
128    if (!ranges.hasFoundFirstRange() || !range.contains(rowArr, offset, length)) {
129      byte[] rowkey = CellUtil.cloneRow(firstRowCell);
130      index = ranges.getNextRangeIndex(rowkey);
131      if (ranges.isIterationComplete(index)) {
132        done = true;
133        currentReturnCode = ReturnCode.NEXT_ROW;
134        return false;
135      }
136      if (index != ROW_BEFORE_FIRST_RANGE) {
137        range = ranges.get(index);
138      } else {
139        range = ranges.get(0);
140      }
141      if (ranges.isExclusive()) {
142        ranges.resetExclusive();
143        currentReturnCode = ReturnCode.NEXT_ROW;
144        return false;
145      }
146      if (!ranges.hasFoundFirstRange()) {
147        if (index != ROW_BEFORE_FIRST_RANGE) {
148          currentReturnCode = ReturnCode.INCLUDE;
149        } else {
150          currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
151        }
152        ranges.setFoundFirstRange();
153      } else {
154        if (range.contains(rowArr, offset, length)) {
155          currentReturnCode = ReturnCode.INCLUDE;
156        } else {
157          currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
158        }
159      }
160    } else {
161      currentReturnCode = ReturnCode.INCLUDE;
162    }
163    return false;
164  }
165
166  @Override
167  public ReturnCode filterCell(final Cell ignored) {
168    return currentReturnCode;
169  }
170
171  @Override
172  public Cell getNextCellHint(Cell currentKV) {
173    // skip to the next range's start row
174    // #getComparisonData lets us avoid the `if (reversed)` branch
175    byte[] comparisonData = range.getComparisonData();
176    return PrivateCellUtil.createFirstOnRow(comparisonData, 0, (short) comparisonData.length);
177  }
178
179  /** Returns The filter serialized using pb */
180  @Override
181  public byte[] toByteArray() {
182    FilterProtos.MultiRowRangeFilter.Builder builder =
183      FilterProtos.MultiRowRangeFilter.newBuilder();
184    for (RowRange range : rangeList) {
185      if (range != null) {
186        FilterProtos.RowRange.Builder rangebuilder = FilterProtos.RowRange.newBuilder();
187        if (range.startRow != null)
188          rangebuilder.setStartRow(UnsafeByteOperations.unsafeWrap(range.startRow));
189        rangebuilder.setStartRowInclusive(range.startRowInclusive);
190        if (range.stopRow != null)
191          rangebuilder.setStopRow(UnsafeByteOperations.unsafeWrap(range.stopRow));
192        rangebuilder.setStopRowInclusive(range.stopRowInclusive);
193        builder.addRowRangeList(rangebuilder.build());
194      }
195    }
196    return builder.build().toByteArray();
197  }
198
199  /**
200   * Parse a serialized representation of {@link MultiRowRangeFilter}
201   * @param pbBytes A pb serialized instance
202   * @return An instance of {@link MultiRowRangeFilter}
203   * @throws DeserializationException if an error occurred
204   * @see #toByteArray
205   */
206  public static MultiRowRangeFilter parseFrom(final byte[] pbBytes)
207    throws DeserializationException {
208    FilterProtos.MultiRowRangeFilter proto;
209    try {
210      proto = FilterProtos.MultiRowRangeFilter.parseFrom(pbBytes);
211    } catch (InvalidProtocolBufferException e) {
212      throw new DeserializationException(e);
213    }
214    int length = proto.getRowRangeListCount();
215    List<FilterProtos.RowRange> rangeProtos = proto.getRowRangeListList();
216    List<RowRange> rangeList = new ArrayList<>(length);
217    for (FilterProtos.RowRange rangeProto : rangeProtos) {
218      RowRange range =
219        new RowRange(rangeProto.hasStartRow() ? rangeProto.getStartRow().toByteArray() : null,
220          rangeProto.getStartRowInclusive(),
221          rangeProto.hasStopRow() ? rangeProto.getStopRow().toByteArray() : null,
222          rangeProto.getStopRowInclusive());
223      rangeList.add(range);
224    }
225    return new MultiRowRangeFilter(rangeList);
226  }
227
228  /**
229   * Returns true if and only if the fields of the filter that are serialized are equal to the
230   * corresponding fields in other. Used for testing.
231   */
232  @Override
233  boolean areSerializedFieldsEqual(Filter o) {
234    if (o == this) {
235      return true;
236    }
237    if (!(o instanceof MultiRowRangeFilter)) {
238      return false;
239    }
240    MultiRowRangeFilter other = (MultiRowRangeFilter) o;
241    if (this.rangeList.size() != other.rangeList.size()) return false;
242    for (int i = 0; i < rangeList.size(); ++i) {
243      RowRange thisRange = this.rangeList.get(i);
244      RowRange otherRange = other.rangeList.get(i);
245      if (
246        !(Bytes.equals(thisRange.startRow, otherRange.startRow)
247          && Bytes.equals(thisRange.stopRow, otherRange.stopRow)
248          && (thisRange.startRowInclusive == otherRange.startRowInclusive)
249          && (thisRange.stopRowInclusive == otherRange.stopRowInclusive))
250      ) {
251        return false;
252      }
253    }
254    return true;
255  }
256
257  /**
258   * sort the ranges and if the ranges with overlap, then merge them.
259   * @param ranges the list of ranges to sort and merge.
260   * @return the ranges after sort and merge.
261   */
262  public static List<RowRange> sortAndMerge(List<RowRange> ranges) {
263    if (ranges.isEmpty()) {
264      throw new IllegalArgumentException("No ranges found.");
265    }
266    List<RowRange> invalidRanges = new ArrayList<>();
267    List<RowRange> newRanges = new ArrayList<>(ranges.size());
268    Collections.sort(ranges);
269    if (ranges.get(0).isValid()) {
270      if (ranges.size() == 1) {
271        newRanges.add(ranges.get(0));
272      }
273    } else {
274      invalidRanges.add(ranges.get(0));
275    }
276
277    byte[] lastStartRow = ranges.get(0).startRow;
278    boolean lastStartRowInclusive = ranges.get(0).startRowInclusive;
279    byte[] lastStopRow = ranges.get(0).stopRow;
280    boolean lastStopRowInclusive = ranges.get(0).stopRowInclusive;
281    int i = 1;
282    for (; i < ranges.size(); i++) {
283      RowRange range = ranges.get(i);
284      if (!range.isValid()) {
285        invalidRanges.add(range);
286      }
287      if (Bytes.equals(lastStopRow, HConstants.EMPTY_BYTE_ARRAY)) {
288        newRanges.add(
289          new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow, lastStopRowInclusive));
290        break;
291      }
292      // with overlap in the ranges
293      if (
294        (Bytes.compareTo(lastStopRow, range.startRow) > 0)
295          || (Bytes.compareTo(lastStopRow, range.startRow) == 0
296            && !(lastStopRowInclusive == false && range.isStartRowInclusive() == false))
297      ) {
298        if (Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
299          newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
300            range.stopRowInclusive));
301          break;
302        }
303        // if first range contains second range, ignore the second range
304        if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
305          if ((Bytes.compareTo(lastStopRow, range.stopRow) == 0)) {
306            if (lastStopRowInclusive == true || range.stopRowInclusive == true) {
307              lastStopRowInclusive = true;
308            }
309          }
310          if ((i + 1) == ranges.size()) {
311            newRanges.add(
312              new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow, lastStopRowInclusive));
313          }
314        } else {
315          lastStopRow = range.stopRow;
316          lastStopRowInclusive = range.stopRowInclusive;
317          if ((i + 1) < ranges.size()) {
318            i++;
319            range = ranges.get(i);
320            if (!range.isValid()) {
321              invalidRanges.add(range);
322            }
323          } else {
324            newRanges.add(
325              new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow, lastStopRowInclusive));
326            break;
327          }
328          while (
329            (Bytes.compareTo(lastStopRow, range.startRow) > 0)
330              || (Bytes.compareTo(lastStopRow, range.startRow) == 0
331                && (lastStopRowInclusive == true || range.startRowInclusive == true))
332          ) {
333            if (Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
334              break;
335            }
336            // if this first range contain second range, ignore the second range
337            if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
338              if (lastStopRowInclusive == true || range.stopRowInclusive == true) {
339                lastStopRowInclusive = true;
340              }
341              i++;
342              if (i < ranges.size()) {
343                range = ranges.get(i);
344                if (!range.isValid()) {
345                  invalidRanges.add(range);
346                }
347              } else {
348                break;
349              }
350            } else {
351              lastStopRow = range.stopRow;
352              lastStopRowInclusive = range.stopRowInclusive;
353              i++;
354              if (i < ranges.size()) {
355                range = ranges.get(i);
356                if (!range.isValid()) {
357                  invalidRanges.add(range);
358                }
359              } else {
360                break;
361              }
362            }
363          }
364          if (Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
365            if (
366              (Bytes.compareTo(lastStopRow, range.startRow) < 0)
367                || (Bytes.compareTo(lastStopRow, range.startRow) == 0
368                  && lastStopRowInclusive == false && range.startRowInclusive == false)
369            ) {
370              newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
371                lastStopRowInclusive));
372              newRanges.add(range);
373            } else {
374              newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
375                range.stopRowInclusive));
376              break;
377            }
378          }
379          newRanges.add(
380            new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow, lastStopRowInclusive));
381          if ((i + 1) == ranges.size()) {
382            newRanges.add(range);
383          }
384          lastStartRow = range.startRow;
385          lastStartRowInclusive = range.startRowInclusive;
386          lastStopRow = range.stopRow;
387          lastStopRowInclusive = range.stopRowInclusive;
388        }
389      } else {
390        newRanges.add(
391          new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow, lastStopRowInclusive));
392        if ((i + 1) == ranges.size()) {
393          newRanges.add(range);
394        }
395        lastStartRow = range.startRow;
396        lastStartRowInclusive = range.startRowInclusive;
397        lastStopRow = range.stopRow;
398        lastStopRowInclusive = range.stopRowInclusive;
399      }
400    }
401    // check the remaining ranges
402    for (int j = i; j < ranges.size(); j++) {
403      if (!ranges.get(j).isValid()) {
404        invalidRanges.add(ranges.get(j));
405      }
406    }
407    // if invalid range exists, throw the exception
408    if (invalidRanges.size() != 0) {
409      throwExceptionForInvalidRanges(invalidRanges, true);
410    }
411    // If no valid ranges found, throw the exception
412    if (newRanges.isEmpty()) {
413      throw new IllegalArgumentException("No valid ranges found.");
414    }
415    return newRanges;
416  }
417
418  private static void throwExceptionForInvalidRanges(List<RowRange> invalidRanges,
419    boolean details) {
420    StringBuilder sb = new StringBuilder();
421    sb.append(invalidRanges.size()).append(" invaild ranges.\n");
422    if (details) {
423      for (RowRange range : invalidRanges) {
424        sb.append("Invalid range: start row => " + Bytes.toString(range.startRow) + ", stop row => "
425          + Bytes.toString(range.stopRow)).append('\n');
426      }
427    }
428    throw new IllegalArgumentException(sb.toString());
429  }
430
431  private static abstract class BasicRowRange implements Comparable<BasicRowRange> {
432    protected byte[] startRow;
433    protected boolean startRowInclusive = true;
434    protected byte[] stopRow;
435    protected boolean stopRowInclusive = false;
436
437    public BasicRowRange() {
438    }
439
440    /**
441     * If the startRow is empty or null, set it to HConstants.EMPTY_BYTE_ARRAY, means begin at the
442     * start row of the table. If the stopRow is empty or null, set it to
443     * HConstants.EMPTY_BYTE_ARRAY, means end of the last row of table.
444     */
445    public BasicRowRange(String startRow, boolean startRowInclusive, String stopRow,
446      boolean stopRowInclusive) {
447      this(
448        (startRow == null || startRow.isEmpty())
449          ? HConstants.EMPTY_BYTE_ARRAY
450          : Bytes.toBytes(startRow),
451        startRowInclusive,
452        (stopRow == null || stopRow.isEmpty())
453          ? HConstants.EMPTY_BYTE_ARRAY
454          : Bytes.toBytes(stopRow),
455        stopRowInclusive);
456    }
457
458    public BasicRowRange(byte[] startRow, boolean startRowInclusive, byte[] stopRow,
459      boolean stopRowInclusive) {
460      this.startRow = (startRow == null) ? HConstants.EMPTY_BYTE_ARRAY : startRow;
461      this.startRowInclusive = startRowInclusive;
462      this.stopRow = (stopRow == null) ? HConstants.EMPTY_BYTE_ARRAY : stopRow;
463      this.stopRowInclusive = stopRowInclusive;
464    }
465
466    public byte[] getStartRow() {
467      return startRow;
468    }
469
470    public byte[] getStopRow() {
471      return stopRow;
472    }
473
474    /** Returns if start row is inclusive. */
475    public boolean isStartRowInclusive() {
476      return startRowInclusive;
477    }
478
479    /** Returns if stop row is inclusive. */
480    public boolean isStopRowInclusive() {
481      return stopRowInclusive;
482    }
483
484    public boolean contains(byte[] row) {
485      return contains(row, 0, row.length);
486    }
487
488    public boolean contains(byte[] buffer, int offset, int length) {
489      if (startRowInclusive) {
490        if (stopRowInclusive) {
491          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
492            && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
493              || Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= 0);
494        } else {
495          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
496            && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
497              || Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < 0);
498        }
499      } else {
500        if (stopRowInclusive) {
501          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
502            && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
503              || Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= 0);
504        } else {
505          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
506            && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
507              || Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < 0);
508        }
509      }
510    }
511
512    public boolean isValid() {
513      return Bytes.equals(startRow, HConstants.EMPTY_BYTE_ARRAY)
514        || Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
515        || Bytes.compareTo(startRow, stopRow) < 0
516        || (Bytes.compareTo(startRow, stopRow) == 0 && stopRowInclusive == true);
517    }
518
519    @Override
520    public boolean equals(Object obj) {
521      if (!(obj instanceof BasicRowRange)) {
522        return false;
523      }
524      if (this == obj) {
525        return true;
526      }
527      BasicRowRange rr = (BasicRowRange) obj;
528      return Bytes.equals(this.stopRow, rr.getStopRow())
529        && Bytes.equals(this.startRow, this.getStartRow())
530        && this.startRowInclusive == rr.isStartRowInclusive()
531        && this.stopRowInclusive == rr.isStopRowInclusive();
532    }
533
534    @Override
535    public int hashCode() {
536      return Objects.hash(Bytes.hashCode(this.stopRow), Bytes.hashCode(this.startRow),
537        this.startRowInclusive, this.stopRowInclusive);
538    }
539
540    /**
541     * Returns the data to be used to compare {@code this} to another object.
542     */
543    public abstract byte[] getComparisonData();
544
545    /**
546     * Returns whether the bounding row used for binary-search is inclusive or not. For forward
547     * scans, we would check the starRow, but we would check the stopRow for the reverse scan case.
548     */
549    public abstract boolean isSearchRowInclusive();
550
551    @Override
552    public int compareTo(BasicRowRange other) {
553      byte[] left;
554      byte[] right;
555      if (isAscendingOrder()) {
556        left = this.getComparisonData();
557        right = other.getComparisonData();
558      } else {
559        left = other.getComparisonData();
560        right = this.getComparisonData();
561      }
562      return Bytes.compareTo(left, right);
563    }
564
565    public abstract boolean isAscendingOrder();
566  }
567
568  /**
569   * Internal RowRange that reverses the sort-order to handle reverse scans.
570   */
571  @InterfaceAudience.Private
572  private static class ReversedRowRange extends BasicRowRange {
573    public ReversedRowRange(byte[] startRow, boolean startRowInclusive, byte[] stopRow,
574      boolean stopRowInclusive) {
575      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
576    }
577
578    @Override
579    public byte[] getComparisonData() {
580      return this.stopRow;
581    }
582
583    @Override
584    public boolean isSearchRowInclusive() {
585      return this.stopRowInclusive;
586    }
587
588    @Override
589    public boolean isAscendingOrder() {
590      return false;
591    }
592  }
593
594  @InterfaceAudience.Public
595  public static class RowRange extends BasicRowRange {
596    public RowRange() {
597    }
598
599    /**
600     * If the startRow is empty or null, set it to HConstants.EMPTY_BYTE_ARRAY, means begin at the
601     * start row of the table. If the stopRow is empty or null, set it to
602     * HConstants.EMPTY_BYTE_ARRAY, means end of the last row of table.
603     */
604    public RowRange(String startRow, boolean startRowInclusive, String stopRow,
605      boolean stopRowInclusive) {
606      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
607    }
608
609    public RowRange(byte[] startRow, boolean startRowInclusive, byte[] stopRow,
610      boolean stopRowInclusive) {
611      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
612    }
613
614    @Override
615    public byte[] getComparisonData() {
616      return startRow;
617    }
618
619    @Override
620    public boolean isSearchRowInclusive() {
621      return startRowInclusive;
622    }
623
624    @Override
625    public boolean isAscendingOrder() {
626      return true;
627    }
628  }
629
630  /**
631   * Abstraction over the ranges of rows to return from this filter, regardless of forward or
632   * reverse scans being used. This Filter can use this class, agnostic of iteration direction, as
633   * the same algorithm can be applied in both cases.
634   */
635  @InterfaceAudience.Private
636  private static class RangeIteration {
637    private boolean exclusive = false;
638    private boolean initialized = false;
639    private boolean foundFirstRange = false;
640    private boolean reversed = false;
641    private final List<RowRange> sortedAndMergedRanges;
642    private List<? extends BasicRowRange> ranges;
643
644    public RangeIteration(List<RowRange> sortedAndMergedRanges) {
645      this.sortedAndMergedRanges = sortedAndMergedRanges;
646    }
647
648    void initialize(boolean reversed) {
649      // Avoid double initialization
650      assert !this.initialized;
651      this.reversed = reversed;
652      if (reversed) {
653        // If we are doing a reverse scan, we can reverse the ranges (both the elements in
654        // the list as well as their start/stop key), and use the same algorithm.
655        this.ranges = flipAndReverseRanges(sortedAndMergedRanges);
656      } else {
657        this.ranges = sortedAndMergedRanges;
658      }
659      this.initialized = true;
660    }
661
662    /**
663     * Rebuilds the sorted ranges (by startKey) into an equivalent sorted list of ranges, only by
664     * stopKey instead. Descending order and the ReversedRowRange compareTo implementation make sure
665     * that we can use Collections.binarySearch().
666     */
667    static List<ReversedRowRange> flipAndReverseRanges(List<RowRange> ranges) {
668      List<ReversedRowRange> flippedRanges = new ArrayList<>(ranges.size());
669      for (int i = ranges.size() - 1; i >= 0; i--) {
670        RowRange origRange = ranges.get(i);
671        ReversedRowRange newRowRange = new ReversedRowRange(origRange.startRow,
672          origRange.startRowInclusive, origRange.stopRow, origRange.isStopRowInclusive());
673        flippedRanges.add(newRowRange);
674      }
675      return flippedRanges;
676    }
677
678    /**
679     * Calculates the position where the given rowkey fits in the ranges list.
680     * @param rowKey the row key to calculate
681     * @return index the position of the row key
682     */
683    public int getNextRangeIndex(byte[] rowKey) {
684      BasicRowRange temp;
685      if (reversed) {
686        temp = new ReversedRowRange(null, true, rowKey, true);
687      } else {
688        temp = new RowRange(rowKey, true, null, true);
689      }
690      // Because we make sure that `ranges` has the correct natural ordering (given it containing
691      // RowRange or ReverseRowRange objects). This keeps us from having to have two different
692      // implementations below.
693      final int index = Collections.binarySearch(ranges, temp);
694      if (index < 0) {
695        int insertionPosition = -index - 1;
696        // check if the row key in the range before the insertion position
697        if (insertionPosition != 0 && ranges.get(insertionPosition - 1).contains(rowKey)) {
698          return insertionPosition - 1;
699        }
700        // check if the row key is before the first range
701        if (insertionPosition == 0 && !ranges.get(insertionPosition).contains(rowKey)) {
702          return ROW_BEFORE_FIRST_RANGE;
703        }
704        if (!foundFirstRange) {
705          foundFirstRange = true;
706        }
707        return insertionPosition;
708      }
709      // the row key equals one of the start keys, and the the range exclude the start key
710      if (ranges.get(index).isSearchRowInclusive() == false) {
711        exclusive = true;
712      }
713      return index;
714    }
715
716    /**
717     * Sets {@link #foundFirstRange} to {@code true}, indicating that we found a matching row range.
718     */
719    public void setFoundFirstRange() {
720      this.foundFirstRange = true;
721    }
722
723    /**
724     * Gets the RowRange at the given offset.
725     */
726    @SuppressWarnings({ "unchecked", "TypeParameterUnusedInFormals" })
727    public <T extends BasicRowRange> T get(int i) {
728      return (T) ranges.get(i);
729    }
730
731    /**
732     * Returns true if the first matching row range was found.
733     */
734    public boolean hasFoundFirstRange() {
735      return foundFirstRange;
736    }
737
738    /**
739     * Returns true if the current range's key is exclusive
740     */
741    public boolean isExclusive() {
742      return exclusive;
743    }
744
745    /**
746     * Resets the exclusive flag.
747     */
748    public void resetExclusive() {
749      exclusive = false;
750    }
751
752    /**
753     * Returns true if this class has been initialized by calling {@link #initialize(boolean)}.
754     */
755    public boolean isInitialized() {
756      return initialized;
757    }
758
759    /**
760     * Returns true if we exhausted searching all row ranges.
761     */
762    public boolean isIterationComplete(int index) {
763      return index >= ranges.size();
764    }
765  }
766
767  @Override
768  public boolean equals(Object obj) {
769    return obj instanceof Filter && areSerializedFieldsEqual((Filter) obj);
770  }
771
772  @Override
773  public int hashCode() {
774    return Objects.hash(this.ranges);
775  }
776}