001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.filter;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.List;
023import java.util.Objects;
024
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.hadoop.hbase.exceptions.DeserializationException;
031import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
032import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
033import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos;
034import org.apache.hadoop.hbase.util.Bytes;
035
036/**
037 * Filter to support scan multiple row key ranges. It can construct the row key ranges from the
038 * passed list which can be accessed by each region server.
039 *
040 * HBase is quite efficient when scanning only one small row key range. If user needs to specify
041 * multiple row key ranges in one scan, the typical solutions are: 1. through FilterList which is a
042 * list of row key Filters, 2. using the SQL layer over HBase to join with two table, such as hive,
043 * phoenix etc. However, both solutions are inefficient. Both of them can't utilize the range info
044 * to perform fast forwarding during scan which is quite time consuming. If the number of ranges
045 * are quite big (e.g. millions), join is a proper solution though it is slow. However, there are
046 * cases that user wants to specify a small number of ranges to scan (e.g. <1000 ranges). Both
047 * solutions can't provide satisfactory performance in such case. MultiRowRangeFilter is to support
048 * such usec ase (scan multiple row key ranges), which can construct the row key ranges from user
049 * specified list and perform fast-forwarding during scan. Thus, the scan will be quite efficient.
050 */
051@InterfaceAudience.Public
052public class MultiRowRangeFilter extends FilterBase {
053
054  private static final int ROW_BEFORE_FIRST_RANGE = -1;
055
056  private final List<RowRange> rangeList;
057  private final RangeIteration ranges;
058
059  private boolean done = false;
060  private int index;
061  private BasicRowRange range;
062  private ReturnCode currentReturnCode;
063
064  /**
065   * @param list A list of <code>RowRange</code>
066   */
067  public MultiRowRangeFilter(List<RowRange> list) {
068    // We don't use rangeList anywhere else, but keeping it lets us pay a little
069    // memory to avoid touching the serialization logic.
070    this.rangeList = Collections.unmodifiableList(sortAndMerge(list));
071    this.ranges = new RangeIteration(rangeList);
072  }
073
074  public List<RowRange> getRowRanges() {
075    // Used by hbase-rest
076    return this.rangeList;
077  }
078
079  @Override
080  public boolean filterAllRemaining() {
081    return done;
082  }
083
084  @Override
085  public boolean filterRowKey(Cell firstRowCell) {
086    if (filterAllRemaining()) return true;
087
088    // N.b. We can only do this after we're iterating over records. If we try to do
089    // it before, the Scan (and this filter) may not yet be fully initialized. This is a
090    // wart on Filter and something that'd be nice to clean up (like CP's in HBase2.0)
091    if (!ranges.isInitialized()) {
092      ranges.initialize(isReversed());
093    }
094
095    // If it is the first time of running, calculate the current range index for
096    // the row key. If index is out of bound which happens when the start row
097    // user sets is after the largest stop row of the ranges, stop the scan.
098    // If row key is after the current range, find the next range and update index.
099    byte[] rowArr = firstRowCell.getRowArray();
100    int length = firstRowCell.getRowLength();
101    int offset = firstRowCell.getRowOffset();
102    if (!ranges.hasFoundFirstRange() || !range.contains(rowArr, offset, length)) {
103      byte[] rowkey = CellUtil.cloneRow(firstRowCell);
104      index = ranges.getNextRangeIndex(rowkey);
105      if (ranges.isIterationComplete(index)) {
106        done = true;
107        currentReturnCode = ReturnCode.NEXT_ROW;
108        return false;
109      }
110      if(index != ROW_BEFORE_FIRST_RANGE) {
111        range = ranges.get(index);
112      } else {
113        range = ranges.get(0);
114      }
115      if (ranges.isExclusive()) {
116        ranges.resetExclusive();
117        currentReturnCode = ReturnCode.NEXT_ROW;
118        return false;
119      }
120      if (!ranges.hasFoundFirstRange()) {
121        if(index != ROW_BEFORE_FIRST_RANGE) {
122          currentReturnCode = ReturnCode.INCLUDE;
123        } else {
124          currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
125        }
126        ranges.setFoundFirstRange();
127      } else {
128        if (range.contains(rowArr, offset, length)) {
129          currentReturnCode = ReturnCode.INCLUDE;
130        } else {
131          currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
132        }
133      }
134    } else {
135      currentReturnCode = ReturnCode.INCLUDE;
136    }
137    return false;
138  }
139
140  @Deprecated
141  @Override
142  public ReturnCode filterKeyValue(final Cell ignored) {
143    return filterCell(ignored);
144  }
145
146  @Override
147  public ReturnCode filterCell(final Cell ignored) {
148    return currentReturnCode;
149  }
150
151  @Override
152  public Cell getNextCellHint(Cell currentKV) {
153    // skip to the next range's start row
154    // #getComparisonData lets us avoid the `if (reversed)` branch
155    byte[] comparisonData = range.getComparisonData();
156    return PrivateCellUtil.createFirstOnRow(comparisonData, 0, (short) comparisonData.length);
157  }
158
159  /**
160   * @return The filter serialized using pb
161   */
162  @Override
163  public byte[] toByteArray() {
164    FilterProtos.MultiRowRangeFilter.Builder builder = FilterProtos.MultiRowRangeFilter
165        .newBuilder();
166    for (RowRange range : rangeList) {
167      if (range != null) {
168        FilterProtos.RowRange.Builder rangebuilder = FilterProtos.RowRange.newBuilder();
169        if (range.startRow != null)
170          rangebuilder.setStartRow(UnsafeByteOperations.unsafeWrap(range.startRow));
171        rangebuilder.setStartRowInclusive(range.startRowInclusive);
172        if (range.stopRow != null)
173          rangebuilder.setStopRow(UnsafeByteOperations.unsafeWrap(range.stopRow));
174        rangebuilder.setStopRowInclusive(range.stopRowInclusive);
175        builder.addRowRangeList(rangebuilder.build());
176      }
177    }
178    return builder.build().toByteArray();
179  }
180
181  /**
182   * @param pbBytes A pb serialized instance
183   * @return An instance of MultiRowRangeFilter
184   * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
185   */
186  public static MultiRowRangeFilter parseFrom(final byte[] pbBytes)
187      throws DeserializationException {
188    FilterProtos.MultiRowRangeFilter proto;
189    try {
190      proto = FilterProtos.MultiRowRangeFilter.parseFrom(pbBytes);
191    } catch (InvalidProtocolBufferException e) {
192      throw new DeserializationException(e);
193    }
194    int length = proto.getRowRangeListCount();
195    List<FilterProtos.RowRange> rangeProtos = proto.getRowRangeListList();
196    List<RowRange> rangeList = new ArrayList<>(length);
197    for (FilterProtos.RowRange rangeProto : rangeProtos) {
198      RowRange range = new RowRange(rangeProto.hasStartRow() ? rangeProto.getStartRow()
199          .toByteArray() : null, rangeProto.getStartRowInclusive(), rangeProto.hasStopRow() ?
200              rangeProto.getStopRow().toByteArray() : null, rangeProto.getStopRowInclusive());
201      rangeList.add(range);
202    }
203    return new MultiRowRangeFilter(rangeList);
204  }
205
206  /**
207   * @param o the filter to compare
208   * @return true if and only if the fields of the filter that are serialized are equal to the
209   *         corresponding fields in other. Used for testing.
210   */
211  @Override
212  boolean areSerializedFieldsEqual(Filter o) {
213    if (o == this)
214      return true;
215    if (!(o instanceof MultiRowRangeFilter))
216      return false;
217
218    MultiRowRangeFilter other = (MultiRowRangeFilter) o;
219    if (this.rangeList.size() != other.rangeList.size())
220      return false;
221    for (int i = 0; i < rangeList.size(); ++i) {
222      RowRange thisRange = this.rangeList.get(i);
223      RowRange otherRange = other.rangeList.get(i);
224      if (!(Bytes.equals(thisRange.startRow, otherRange.startRow) && Bytes.equals(
225          thisRange.stopRow, otherRange.stopRow) && (thisRange.startRowInclusive ==
226          otherRange.startRowInclusive) && (thisRange.stopRowInclusive ==
227          otherRange.stopRowInclusive))) {
228        return false;
229      }
230    }
231    return true;
232  }
233
234  /**
235   * sort the ranges and if the ranges with overlap, then merge them.
236   *
237   * @param ranges the list of ranges to sort and merge.
238   * @return the ranges after sort and merge.
239   */
240  public static List<RowRange> sortAndMerge(List<RowRange> ranges) {
241    if (ranges.isEmpty()) {
242      throw new IllegalArgumentException("No ranges found.");
243    }
244    List<RowRange> invalidRanges = new ArrayList<>();
245    List<RowRange> newRanges = new ArrayList<>(ranges.size());
246    Collections.sort(ranges);
247    if(ranges.get(0).isValid()) {
248      if (ranges.size() == 1) {
249        newRanges.add(ranges.get(0));
250      }
251    } else {
252      invalidRanges.add(ranges.get(0));
253    }
254
255    byte[] lastStartRow = ranges.get(0).startRow;
256    boolean lastStartRowInclusive = ranges.get(0).startRowInclusive;
257    byte[] lastStopRow = ranges.get(0).stopRow;
258    boolean lastStopRowInclusive = ranges.get(0).stopRowInclusive;
259    int i = 1;
260    for (; i < ranges.size(); i++) {
261      RowRange range = ranges.get(i);
262      if (!range.isValid()) {
263        invalidRanges.add(range);
264      }
265      if(Bytes.equals(lastStopRow, HConstants.EMPTY_BYTE_ARRAY)) {
266        newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
267            lastStopRowInclusive));
268        break;
269      }
270      // with overlap in the ranges
271      if ((Bytes.compareTo(lastStopRow, range.startRow) > 0) ||
272          (Bytes.compareTo(lastStopRow, range.startRow) == 0 && !(lastStopRowInclusive == false &&
273          range.isStartRowInclusive() == false))) {
274        if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
275          newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
276              range.stopRowInclusive));
277          break;
278        }
279        // if first range contains second range, ignore the second range
280        if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
281          if((Bytes.compareTo(lastStopRow, range.stopRow) == 0)) {
282            if(lastStopRowInclusive == true || range.stopRowInclusive == true) {
283              lastStopRowInclusive = true;
284            }
285          }
286          if ((i + 1) == ranges.size()) {
287            newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
288                lastStopRowInclusive));
289          }
290        } else {
291          lastStopRow = range.stopRow;
292          lastStopRowInclusive = range.stopRowInclusive;
293          if ((i + 1) < ranges.size()) {
294            i++;
295            range = ranges.get(i);
296            if (!range.isValid()) {
297              invalidRanges.add(range);
298            }
299          } else {
300            newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
301                lastStopRowInclusive));
302            break;
303          }
304          while ((Bytes.compareTo(lastStopRow, range.startRow) > 0) ||
305              (Bytes.compareTo(lastStopRow, range.startRow) == 0 &&
306              (lastStopRowInclusive == true || range.startRowInclusive==true))) {
307            if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
308              break;
309            }
310            // if this first range contain second range, ignore the second range
311            if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
312              if(lastStopRowInclusive == true || range.stopRowInclusive == true) {
313                lastStopRowInclusive = true;
314              }
315              i++;
316              if (i < ranges.size()) {
317                range = ranges.get(i);
318                if (!range.isValid()) {
319                  invalidRanges.add(range);
320                }
321              } else {
322                break;
323              }
324            } else {
325              lastStopRow = range.stopRow;
326              lastStopRowInclusive = range.stopRowInclusive;
327              i++;
328              if (i < ranges.size()) {
329                range = ranges.get(i);
330                if (!range.isValid()) {
331                  invalidRanges.add(range);
332                }
333              } else {
334                break;
335              }
336            }
337          }
338          if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
339            if((Bytes.compareTo(lastStopRow, range.startRow) < 0) ||
340                (Bytes.compareTo(lastStopRow, range.startRow) == 0 &&
341                lastStopRowInclusive == false && range.startRowInclusive == false)) {
342              newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
343                  lastStopRowInclusive));
344              newRanges.add(range);
345            } else {
346              newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
347                  range.stopRowInclusive));
348              break;
349            }
350          }
351          newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
352              lastStopRowInclusive));
353          if ((i + 1) == ranges.size()) {
354            newRanges.add(range);
355          }
356          lastStartRow = range.startRow;
357          lastStartRowInclusive = range.startRowInclusive;
358          lastStopRow = range.stopRow;
359          lastStopRowInclusive = range.stopRowInclusive;
360        }
361      } else {
362        newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
363            lastStopRowInclusive));
364        if ((i + 1) == ranges.size()) {
365          newRanges.add(range);
366        }
367        lastStartRow = range.startRow;
368        lastStartRowInclusive = range.startRowInclusive;
369        lastStopRow = range.stopRow;
370        lastStopRowInclusive = range.stopRowInclusive;
371      }
372    }
373    // check the remaining ranges
374    for(int j=i; j < ranges.size(); j++) {
375      if(!ranges.get(j).isValid()) {
376        invalidRanges.add(ranges.get(j));
377      }
378    }
379    // if invalid range exists, throw the exception
380    if (invalidRanges.size() != 0) {
381      throwExceptionForInvalidRanges(invalidRanges, true);
382    }
383    // If no valid ranges found, throw the exception
384    if(newRanges.isEmpty()) {
385      throw new IllegalArgumentException("No valid ranges found.");
386    }
387    return newRanges;
388  }
389
390  private static void throwExceptionForInvalidRanges(List<RowRange> invalidRanges,
391      boolean details) {
392    StringBuilder sb = new StringBuilder();
393    sb.append(invalidRanges.size()).append(" invaild ranges.\n");
394    if (details) {
395      for (RowRange range : invalidRanges) {
396        sb.append(
397            "Invalid range: start row => " + Bytes.toString(range.startRow) + ", stop row => "
398                + Bytes.toString(range.stopRow)).append('\n');
399      }
400    }
401    throw new IllegalArgumentException(sb.toString());
402  }
403
404  private static abstract class BasicRowRange implements Comparable<BasicRowRange> {
405    protected byte[] startRow;
406    protected boolean startRowInclusive = true;
407    protected byte[] stopRow;
408    protected boolean stopRowInclusive = false;
409
410    public BasicRowRange() {
411    }
412    /**
413     * If the startRow is empty or null, set it to HConstants.EMPTY_BYTE_ARRAY, means begin at the
414     * start row of the table. If the stopRow is empty or null, set it to
415     * HConstants.EMPTY_BYTE_ARRAY, means end of the last row of table.
416     */
417    public BasicRowRange(String startRow, boolean startRowInclusive, String stopRow,
418        boolean stopRowInclusive) {
419      this((startRow == null || startRow.isEmpty()) ? HConstants.EMPTY_BYTE_ARRAY :
420        Bytes.toBytes(startRow), startRowInclusive,
421        (stopRow == null || stopRow.isEmpty()) ? HConstants.EMPTY_BYTE_ARRAY :
422        Bytes.toBytes(stopRow), stopRowInclusive);
423    }
424
425    public BasicRowRange(byte[] startRow,  boolean startRowInclusive, byte[] stopRow,
426        boolean stopRowInclusive) {
427      this.startRow = (startRow == null) ? HConstants.EMPTY_BYTE_ARRAY : startRow;
428      this.startRowInclusive = startRowInclusive;
429      this.stopRow = (stopRow == null) ? HConstants.EMPTY_BYTE_ARRAY :stopRow;
430      this.stopRowInclusive = stopRowInclusive;
431    }
432
433    public byte[] getStartRow() {
434      return startRow;
435    }
436
437    public byte[] getStopRow() {
438      return stopRow;
439    }
440
441    /**
442     * @return if start row is inclusive.
443     */
444    public boolean isStartRowInclusive() {
445      return startRowInclusive;
446    }
447
448    /**
449     * @return if stop row is inclusive.
450     */
451    public boolean isStopRowInclusive() {
452      return stopRowInclusive;
453    }
454
455    public boolean contains(byte[] row) {
456      return contains(row, 0, row.length);
457    }
458
459    public boolean contains(byte[] buffer, int offset, int length) {
460      if(startRowInclusive) {
461        if(stopRowInclusive) {
462          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
463              && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
464                  Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= 0);
465        } else {
466          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
467              && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
468                  Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < 0);
469        }
470      } else {
471        if(stopRowInclusive) {
472          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
473              && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
474                  Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= 0);
475        } else {
476          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
477              && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
478                  Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < 0);
479        }
480      }
481    }
482
483    public boolean isValid() {
484      return Bytes.equals(startRow, HConstants.EMPTY_BYTE_ARRAY)
485          || Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
486          || Bytes.compareTo(startRow, stopRow) < 0
487          || (Bytes.compareTo(startRow, stopRow) == 0 && stopRowInclusive == true);
488    }
489
490    @Override
491    public boolean equals(Object obj){
492      if (!(obj instanceof BasicRowRange)) {
493        return false;
494      }
495      if (this == obj) {
496        return true;
497      }
498      BasicRowRange rr = (BasicRowRange) obj;
499      return Bytes.equals(this.stopRow, rr.getStopRow()) &&
500          Bytes.equals(this.startRow, this.getStartRow()) &&
501          this.startRowInclusive == rr.isStartRowInclusive() &&
502          this.stopRowInclusive == rr.isStopRowInclusive();
503    }
504
505    @Override
506    public int hashCode() {
507      return Objects.hash(Bytes.hashCode(this.stopRow),
508          Bytes.hashCode(this.startRow),
509          this.startRowInclusive,
510          this.stopRowInclusive);
511    }
512
513    /**
514     * Returns the data to be used to compare {@code this} to another object.
515     */
516    public abstract byte[] getComparisonData();
517
518    /**
519     * Returns whether the bounding row used for binary-search is inclusive or not.
520     *
521     * For forward scans, we would check the starRow, but we would check the stopRow for
522     * the reverse scan case.
523     */
524    public abstract boolean isSearchRowInclusive();
525
526    @Override
527    public int compareTo(BasicRowRange other) {
528      byte[] left;
529      byte[] right;
530      if (isAscendingOrder()) {
531        left = this.getComparisonData();
532        right = other.getComparisonData();
533      } else {
534        left = other.getComparisonData();
535        right = this.getComparisonData();
536      }
537      return Bytes.compareTo(left, right);
538    }
539
540    public abstract boolean isAscendingOrder();
541  }
542
543  /**
544   * Internal RowRange that reverses the sort-order to handle reverse scans.
545   */
546  @InterfaceAudience.Private
547  private static class ReversedRowRange extends BasicRowRange {
548    public ReversedRowRange(byte[] startRow,  boolean startRowInclusive, byte[] stopRow,
549        boolean stopRowInclusive) {
550      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
551    }
552
553    @Override
554    public byte[] getComparisonData() {
555      return this.stopRow;
556    }
557
558    @Override
559    public boolean isSearchRowInclusive() {
560      return this.stopRowInclusive;
561    }
562
563    @Override
564    public boolean isAscendingOrder() {
565      return false;
566    }
567  }
568
569  @InterfaceAudience.Public
570  public static class RowRange extends BasicRowRange {
571    public RowRange() {
572    }
573    /**
574     * If the startRow is empty or null, set it to HConstants.EMPTY_BYTE_ARRAY, means begin at the
575     * start row of the table. If the stopRow is empty or null, set it to
576     * HConstants.EMPTY_BYTE_ARRAY, means end of the last row of table.
577     */
578    public RowRange(String startRow, boolean startRowInclusive, String stopRow,
579        boolean stopRowInclusive) {
580      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
581    }
582
583    public RowRange(byte[] startRow,  boolean startRowInclusive, byte[] stopRow,
584        boolean stopRowInclusive) {
585      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
586    }
587
588    @Override
589    public byte[] getComparisonData() {
590      return startRow;
591    }
592
593    @Override
594    public boolean isSearchRowInclusive() {
595      return startRowInclusive;
596    }
597
598    @Override
599    public boolean isAscendingOrder() {
600      return true;
601    }
602  }
603
604  /**
605   * Abstraction over the ranges of rows to return from this filter, regardless of forward or
606   * reverse scans being used. This Filter can use this class, agnostic of iteration direction,
607   * as the same algorithm can be applied in both cases.
608   */
609  @InterfaceAudience.Private
610  private static class RangeIteration {
611    private boolean exclusive = false;
612    private boolean initialized = false;
613    private boolean foundFirstRange = false;
614    private boolean reversed = false;
615    private final List<RowRange> sortedAndMergedRanges;
616    private List<? extends BasicRowRange> ranges;
617
618    public RangeIteration(List<RowRange> sortedAndMergedRanges) {
619      this.sortedAndMergedRanges = sortedAndMergedRanges;
620    }
621
622    void initialize(boolean reversed) {
623      // Avoid double initialization
624      assert !this.initialized;
625      this.reversed = reversed;
626      if (reversed) {
627        // If we are doing a reverse scan, we can reverse the ranges (both the elements in
628        // the list as well as their start/stop key), and use the same algorithm.
629        this.ranges = flipAndReverseRanges(sortedAndMergedRanges);
630      } else {
631        this.ranges = sortedAndMergedRanges;
632      }
633      this.initialized = true;
634    }
635
636    /**
637     * Rebuilds the sorted ranges (by startKey) into an equivalent sorted list of ranges, only by
638     * stopKey instead. Descending order and the ReversedRowRange compareTo implementation make
639     * sure that we can use Collections.binarySearch().
640     */
641    static List<ReversedRowRange> flipAndReverseRanges(List<RowRange> ranges) {
642      List<ReversedRowRange> flippedRanges = new ArrayList<>(ranges.size());
643      for (int i = ranges.size() - 1; i >= 0; i--) {
644        RowRange origRange = ranges.get(i);
645        ReversedRowRange newRowRange = new ReversedRowRange(
646            origRange.startRow, origRange.startRowInclusive, origRange.stopRow,
647            origRange.isStopRowInclusive());
648        flippedRanges.add(newRowRange);
649      }
650      return flippedRanges;
651    }
652
653    /**
654     * Calculates the position where the given rowkey fits in the ranges list.
655     *
656     * @param rowKey the row key to calculate
657     * @return index the position of the row key
658     */
659    public int getNextRangeIndex(byte[] rowKey) {
660      BasicRowRange temp;
661      if (reversed) {
662        temp = new ReversedRowRange(null, true, rowKey, true);
663      } else {
664        temp = new RowRange(rowKey, true, null, true);
665      }
666      // Because we make sure that `ranges` has the correct natural ordering (given it containing
667      // RowRange or ReverseRowRange objects). This keeps us from having to have two different
668      // implementations below.
669      final int index = Collections.binarySearch(ranges, temp);
670      if (index < 0) {
671        int insertionPosition = -index - 1;
672        // check if the row key in the range before the insertion position
673        if (insertionPosition != 0 && ranges.get(insertionPosition - 1).contains(rowKey)) {
674          return insertionPosition - 1;
675        }
676        // check if the row key is before the first range
677        if (insertionPosition == 0 && !ranges.get(insertionPosition).contains(rowKey)) {
678          return ROW_BEFORE_FIRST_RANGE;
679        }
680        if (!foundFirstRange) {
681          foundFirstRange = true;
682        }
683        return insertionPosition;
684      }
685      // the row key equals one of the start keys, and the the range exclude the start key
686      if(ranges.get(index).isSearchRowInclusive() == false) {
687        exclusive = true;
688      }
689      return index;
690    }
691
692    /**
693     * Sets {@link #foundFirstRange} to {@code true}, indicating that we found a matching row range.
694     */
695    public void setFoundFirstRange() {
696      this.foundFirstRange = true;
697    }
698
699    /**
700     * Gets the RowRange at the given offset.
701     */
702    @SuppressWarnings("unchecked")
703    public <T extends BasicRowRange> T get(int i) {
704      return (T) ranges.get(i);
705    }
706
707    /**
708     * Returns true if the first matching row range was found.
709     */
710    public boolean hasFoundFirstRange() {
711      return foundFirstRange;
712    }
713
714    /**
715     * Returns true if the current range's key is exclusive
716     */
717    public boolean isExclusive() {
718      return exclusive;
719    }
720
721    /**
722     * Resets the exclusive flag.
723     */
724    public void resetExclusive() {
725      exclusive = false;
726    }
727
728    /**
729     * Returns true if this class has been initialized by calling {@link #initialize(boolean)}.
730     */
731    public boolean isInitialized() {
732      return initialized;
733    }
734
735    /**
736     * Returns true if we exhausted searching all row ranges.
737     */
738    public boolean isIterationComplete(int index) {
739      return index >= ranges.size();
740    }
741  }
742
743  @Override
744  public boolean equals(Object obj) {
745    return obj instanceof Filter && areSerializedFieldsEqual((Filter) obj);
746  }
747
748  @Override
749  public int hashCode() {
750    return Objects.hash(this.ranges);
751  }
752}