001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.filter;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.List;
023import java.util.Objects;
024
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.client.ClientUtil;
030import org.apache.hadoop.hbase.exceptions.DeserializationException;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
034import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos;
036
037/**
038 * Filter to support scan multiple row key ranges. It can construct the row key ranges from the
039 * passed list which can be accessed by each region server.
040 *
041 * HBase is quite efficient when scanning only one small row key range. If user needs to specify
042 * multiple row key ranges in one scan, the typical solutions are: 1. through FilterList which is a
043 * list of row key Filters, 2. using the SQL layer over HBase to join with two table, such as hive,
044 * phoenix etc. However, both solutions are inefficient. Both of them can't utilize the range info
045 * to perform fast forwarding during scan which is quite time consuming. If the number of ranges
046 * are quite big (e.g. millions), join is a proper solution though it is slow. However, there are
047 * cases that user wants to specify a small number of ranges to scan (e.g. <1000 ranges). Both
048 * solutions can't provide satisfactory performance in such case. MultiRowRangeFilter is to support
049 * such usec ase (scan multiple row key ranges), which can construct the row key ranges from user
050 * specified list and perform fast-forwarding during scan. Thus, the scan will be quite efficient.
051 */
052@InterfaceAudience.Public
053public class MultiRowRangeFilter extends FilterBase {
054
055  private static final int ROW_BEFORE_FIRST_RANGE = -1;
056
057  private final List<RowRange> rangeList;
058  private final RangeIteration ranges;
059
060  private boolean done = false;
061  private int index;
062  private BasicRowRange range;
063  private ReturnCode currentReturnCode;
064
065  /**
066   * @param list A list of <code>RowRange</code>
067   */
068  public MultiRowRangeFilter(List<RowRange> list) {
069    // We don't use rangeList anywhere else, but keeping it lets us pay a little
070    // memory to avoid touching the serialization logic.
071    this.rangeList = Collections.unmodifiableList(sortAndMerge(list));
072    this.ranges = new RangeIteration(rangeList);
073  }
074
075  /**
076   * Constructor for creating a <code>MultiRowRangeFilter</code> from multiple rowkey prefixes.
077   *
078   * As <code>MultiRowRangeFilter</code> javadoc says (See the solution 1 of the first statement),
079   * if you try to create a filter list that scans row keys corresponding to given prefixes (e.g.,
080   * <code>FilterList</code> composed of multiple <code>PrefixFilter</code>s), this constructor
081   * provides a way to avoid creating an inefficient one.
082   *
083   * @param rowKeyPrefixes the array of byte array
084   */
085  public MultiRowRangeFilter(byte[][] rowKeyPrefixes) {
086    this(createRangeListFromRowKeyPrefixes(rowKeyPrefixes));
087  }
088
089  private static List<RowRange> createRangeListFromRowKeyPrefixes(byte[][] rowKeyPrefixes) {
090    if (rowKeyPrefixes == null) {
091      throw new IllegalArgumentException("Invalid rowkey prefixes");
092    }
093
094    List<RowRange> list = new ArrayList<>();
095    for (byte[] rowKeyPrefix: rowKeyPrefixes) {
096      byte[] stopRow = ClientUtil.calculateTheClosestNextRowKeyForPrefix(rowKeyPrefix);
097      list.add(new RowRange(rowKeyPrefix, true, stopRow, false));
098    }
099    return list;
100  }
101
102  public List<RowRange> getRowRanges() {
103    // Used by hbase-rest
104    return this.rangeList;
105  }
106
107  @Override
108  public boolean filterAllRemaining() {
109    return done;
110  }
111
112  @Override
113  public boolean filterRowKey(Cell firstRowCell) {
114    if (filterAllRemaining()) return true;
115
116    // N.b. We can only do this after we're iterating over records. If we try to do
117    // it before, the Scan (and this filter) may not yet be fully initialized. This is a
118    // wart on Filter and something that'd be nice to clean up (like CP's in HBase2.0)
119    if (!ranges.isInitialized()) {
120      ranges.initialize(isReversed());
121    }
122
123    // If it is the first time of running, calculate the current range index for
124    // the row key. If index is out of bound which happens when the start row
125    // user sets is after the largest stop row of the ranges, stop the scan.
126    // If row key is after the current range, find the next range and update index.
127    byte[] rowArr = firstRowCell.getRowArray();
128    int length = firstRowCell.getRowLength();
129    int offset = firstRowCell.getRowOffset();
130    if (!ranges.hasFoundFirstRange() || !range.contains(rowArr, offset, length)) {
131      byte[] rowkey = CellUtil.cloneRow(firstRowCell);
132      index = ranges.getNextRangeIndex(rowkey);
133      if (ranges.isIterationComplete(index)) {
134        done = true;
135        currentReturnCode = ReturnCode.NEXT_ROW;
136        return false;
137      }
138      if(index != ROW_BEFORE_FIRST_RANGE) {
139        range = ranges.get(index);
140      } else {
141        range = ranges.get(0);
142      }
143      if (ranges.isExclusive()) {
144        ranges.resetExclusive();
145        currentReturnCode = ReturnCode.NEXT_ROW;
146        return false;
147      }
148      if (!ranges.hasFoundFirstRange()) {
149        if(index != ROW_BEFORE_FIRST_RANGE) {
150          currentReturnCode = ReturnCode.INCLUDE;
151        } else {
152          currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
153        }
154        ranges.setFoundFirstRange();
155      } else {
156        if (range.contains(rowArr, offset, length)) {
157          currentReturnCode = ReturnCode.INCLUDE;
158        } else {
159          currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
160        }
161      }
162    } else {
163      currentReturnCode = ReturnCode.INCLUDE;
164    }
165    return false;
166  }
167
168  @Deprecated
169  @Override
170  public ReturnCode filterKeyValue(final Cell ignored) {
171    return filterCell(ignored);
172  }
173
174  @Override
175  public ReturnCode filterCell(final Cell ignored) {
176    return currentReturnCode;
177  }
178
179  @Override
180  public Cell getNextCellHint(Cell currentKV) {
181    // skip to the next range's start row
182    // #getComparisonData lets us avoid the `if (reversed)` branch
183    byte[] comparisonData = range.getComparisonData();
184    return PrivateCellUtil.createFirstOnRow(comparisonData, 0, (short) comparisonData.length);
185  }
186
187  /**
188   * @return The filter serialized using pb
189   */
190  @Override
191  public byte[] toByteArray() {
192    FilterProtos.MultiRowRangeFilter.Builder builder = FilterProtos.MultiRowRangeFilter
193        .newBuilder();
194    for (RowRange range : rangeList) {
195      if (range != null) {
196        FilterProtos.RowRange.Builder rangebuilder = FilterProtos.RowRange.newBuilder();
197        if (range.startRow != null)
198          rangebuilder.setStartRow(UnsafeByteOperations.unsafeWrap(range.startRow));
199        rangebuilder.setStartRowInclusive(range.startRowInclusive);
200        if (range.stopRow != null)
201          rangebuilder.setStopRow(UnsafeByteOperations.unsafeWrap(range.stopRow));
202        rangebuilder.setStopRowInclusive(range.stopRowInclusive);
203        builder.addRowRangeList(rangebuilder.build());
204      }
205    }
206    return builder.build().toByteArray();
207  }
208
209  /**
210   * @param pbBytes A pb serialized instance
211   * @return An instance of MultiRowRangeFilter
212   * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
213   */
214  public static MultiRowRangeFilter parseFrom(final byte[] pbBytes)
215      throws DeserializationException {
216    FilterProtos.MultiRowRangeFilter proto;
217    try {
218      proto = FilterProtos.MultiRowRangeFilter.parseFrom(pbBytes);
219    } catch (InvalidProtocolBufferException e) {
220      throw new DeserializationException(e);
221    }
222    int length = proto.getRowRangeListCount();
223    List<FilterProtos.RowRange> rangeProtos = proto.getRowRangeListList();
224    List<RowRange> rangeList = new ArrayList<>(length);
225    for (FilterProtos.RowRange rangeProto : rangeProtos) {
226      RowRange range = new RowRange(rangeProto.hasStartRow() ? rangeProto.getStartRow()
227          .toByteArray() : null, rangeProto.getStartRowInclusive(), rangeProto.hasStopRow() ?
228              rangeProto.getStopRow().toByteArray() : null, rangeProto.getStopRowInclusive());
229      rangeList.add(range);
230    }
231    return new MultiRowRangeFilter(rangeList);
232  }
233
234  /**
235   * @param o the filter to compare
236   * @return true if and only if the fields of the filter that are serialized are equal to the
237   *         corresponding fields in other. Used for testing.
238   */
239  @Override
240  boolean areSerializedFieldsEqual(Filter o) {
241    if (o == this)
242      return true;
243    if (!(o instanceof MultiRowRangeFilter))
244      return false;
245
246    MultiRowRangeFilter other = (MultiRowRangeFilter) o;
247    if (this.rangeList.size() != other.rangeList.size())
248      return false;
249    for (int i = 0; i < rangeList.size(); ++i) {
250      RowRange thisRange = this.rangeList.get(i);
251      RowRange otherRange = other.rangeList.get(i);
252      if (!(Bytes.equals(thisRange.startRow, otherRange.startRow) && Bytes.equals(
253          thisRange.stopRow, otherRange.stopRow) && (thisRange.startRowInclusive ==
254          otherRange.startRowInclusive) && (thisRange.stopRowInclusive ==
255          otherRange.stopRowInclusive))) {
256        return false;
257      }
258    }
259    return true;
260  }
261
262  /**
263   * sort the ranges and if the ranges with overlap, then merge them.
264   *
265   * @param ranges the list of ranges to sort and merge.
266   * @return the ranges after sort and merge.
267   */
268  public static List<RowRange> sortAndMerge(List<RowRange> ranges) {
269    if (ranges.isEmpty()) {
270      throw new IllegalArgumentException("No ranges found.");
271    }
272    List<RowRange> invalidRanges = new ArrayList<>();
273    List<RowRange> newRanges = new ArrayList<>(ranges.size());
274    Collections.sort(ranges);
275    if(ranges.get(0).isValid()) {
276      if (ranges.size() == 1) {
277        newRanges.add(ranges.get(0));
278      }
279    } else {
280      invalidRanges.add(ranges.get(0));
281    }
282
283    byte[] lastStartRow = ranges.get(0).startRow;
284    boolean lastStartRowInclusive = ranges.get(0).startRowInclusive;
285    byte[] lastStopRow = ranges.get(0).stopRow;
286    boolean lastStopRowInclusive = ranges.get(0).stopRowInclusive;
287    int i = 1;
288    for (; i < ranges.size(); i++) {
289      RowRange range = ranges.get(i);
290      if (!range.isValid()) {
291        invalidRanges.add(range);
292      }
293      if(Bytes.equals(lastStopRow, HConstants.EMPTY_BYTE_ARRAY)) {
294        newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
295            lastStopRowInclusive));
296        break;
297      }
298      // with overlap in the ranges
299      if ((Bytes.compareTo(lastStopRow, range.startRow) > 0) ||
300          (Bytes.compareTo(lastStopRow, range.startRow) == 0 && !(lastStopRowInclusive == false &&
301          range.isStartRowInclusive() == false))) {
302        if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
303          newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
304              range.stopRowInclusive));
305          break;
306        }
307        // if first range contains second range, ignore the second range
308        if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
309          if((Bytes.compareTo(lastStopRow, range.stopRow) == 0)) {
310            if(lastStopRowInclusive == true || range.stopRowInclusive == true) {
311              lastStopRowInclusive = true;
312            }
313          }
314          if ((i + 1) == ranges.size()) {
315            newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
316                lastStopRowInclusive));
317          }
318        } else {
319          lastStopRow = range.stopRow;
320          lastStopRowInclusive = range.stopRowInclusive;
321          if ((i + 1) < ranges.size()) {
322            i++;
323            range = ranges.get(i);
324            if (!range.isValid()) {
325              invalidRanges.add(range);
326            }
327          } else {
328            newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
329                lastStopRowInclusive));
330            break;
331          }
332          while ((Bytes.compareTo(lastStopRow, range.startRow) > 0) ||
333              (Bytes.compareTo(lastStopRow, range.startRow) == 0 &&
334              (lastStopRowInclusive == true || range.startRowInclusive==true))) {
335            if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
336              break;
337            }
338            // if this first range contain second range, ignore the second range
339            if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
340              if(lastStopRowInclusive == true || range.stopRowInclusive == true) {
341                lastStopRowInclusive = true;
342              }
343              i++;
344              if (i < ranges.size()) {
345                range = ranges.get(i);
346                if (!range.isValid()) {
347                  invalidRanges.add(range);
348                }
349              } else {
350                break;
351              }
352            } else {
353              lastStopRow = range.stopRow;
354              lastStopRowInclusive = range.stopRowInclusive;
355              i++;
356              if (i < ranges.size()) {
357                range = ranges.get(i);
358                if (!range.isValid()) {
359                  invalidRanges.add(range);
360                }
361              } else {
362                break;
363              }
364            }
365          }
366          if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
367            if((Bytes.compareTo(lastStopRow, range.startRow) < 0) ||
368                (Bytes.compareTo(lastStopRow, range.startRow) == 0 &&
369                lastStopRowInclusive == false && range.startRowInclusive == false)) {
370              newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
371                  lastStopRowInclusive));
372              newRanges.add(range);
373            } else {
374              newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
375                  range.stopRowInclusive));
376              break;
377            }
378          }
379          newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
380              lastStopRowInclusive));
381          if ((i + 1) == ranges.size()) {
382            newRanges.add(range);
383          }
384          lastStartRow = range.startRow;
385          lastStartRowInclusive = range.startRowInclusive;
386          lastStopRow = range.stopRow;
387          lastStopRowInclusive = range.stopRowInclusive;
388        }
389      } else {
390        newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
391            lastStopRowInclusive));
392        if ((i + 1) == ranges.size()) {
393          newRanges.add(range);
394        }
395        lastStartRow = range.startRow;
396        lastStartRowInclusive = range.startRowInclusive;
397        lastStopRow = range.stopRow;
398        lastStopRowInclusive = range.stopRowInclusive;
399      }
400    }
401    // check the remaining ranges
402    for(int j=i; j < ranges.size(); j++) {
403      if(!ranges.get(j).isValid()) {
404        invalidRanges.add(ranges.get(j));
405      }
406    }
407    // if invalid range exists, throw the exception
408    if (invalidRanges.size() != 0) {
409      throwExceptionForInvalidRanges(invalidRanges, true);
410    }
411    // If no valid ranges found, throw the exception
412    if(newRanges.isEmpty()) {
413      throw new IllegalArgumentException("No valid ranges found.");
414    }
415    return newRanges;
416  }
417
418  private static void throwExceptionForInvalidRanges(List<RowRange> invalidRanges,
419      boolean details) {
420    StringBuilder sb = new StringBuilder();
421    sb.append(invalidRanges.size()).append(" invaild ranges.\n");
422    if (details) {
423      for (RowRange range : invalidRanges) {
424        sb.append(
425            "Invalid range: start row => " + Bytes.toString(range.startRow) + ", stop row => "
426                + Bytes.toString(range.stopRow)).append('\n');
427      }
428    }
429    throw new IllegalArgumentException(sb.toString());
430  }
431
432  private static abstract class BasicRowRange implements Comparable<BasicRowRange> {
433    protected byte[] startRow;
434    protected boolean startRowInclusive = true;
435    protected byte[] stopRow;
436    protected boolean stopRowInclusive = false;
437
438    public BasicRowRange() {
439    }
440    /**
441     * If the startRow is empty or null, set it to HConstants.EMPTY_BYTE_ARRAY, means begin at the
442     * start row of the table. If the stopRow is empty or null, set it to
443     * HConstants.EMPTY_BYTE_ARRAY, means end of the last row of table.
444     */
445    public BasicRowRange(String startRow, boolean startRowInclusive, String stopRow,
446        boolean stopRowInclusive) {
447      this((startRow == null || startRow.isEmpty()) ? HConstants.EMPTY_BYTE_ARRAY :
448        Bytes.toBytes(startRow), startRowInclusive,
449        (stopRow == null || stopRow.isEmpty()) ? HConstants.EMPTY_BYTE_ARRAY :
450        Bytes.toBytes(stopRow), stopRowInclusive);
451    }
452
453    public BasicRowRange(byte[] startRow,  boolean startRowInclusive, byte[] stopRow,
454        boolean stopRowInclusive) {
455      this.startRow = (startRow == null) ? HConstants.EMPTY_BYTE_ARRAY : startRow;
456      this.startRowInclusive = startRowInclusive;
457      this.stopRow = (stopRow == null) ? HConstants.EMPTY_BYTE_ARRAY :stopRow;
458      this.stopRowInclusive = stopRowInclusive;
459    }
460
461    public byte[] getStartRow() {
462      return startRow;
463    }
464
465    public byte[] getStopRow() {
466      return stopRow;
467    }
468
469    /**
470     * @return if start row is inclusive.
471     */
472    public boolean isStartRowInclusive() {
473      return startRowInclusive;
474    }
475
476    /**
477     * @return if stop row is inclusive.
478     */
479    public boolean isStopRowInclusive() {
480      return stopRowInclusive;
481    }
482
483    public boolean contains(byte[] row) {
484      return contains(row, 0, row.length);
485    }
486
487    public boolean contains(byte[] buffer, int offset, int length) {
488      if(startRowInclusive) {
489        if(stopRowInclusive) {
490          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
491              && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
492                  Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= 0);
493        } else {
494          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
495              && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
496                  Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < 0);
497        }
498      } else {
499        if(stopRowInclusive) {
500          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
501              && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
502                  Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= 0);
503        } else {
504          return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
505              && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
506                  Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < 0);
507        }
508      }
509    }
510
511    public boolean isValid() {
512      return Bytes.equals(startRow, HConstants.EMPTY_BYTE_ARRAY)
513          || Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
514          || Bytes.compareTo(startRow, stopRow) < 0
515          || (Bytes.compareTo(startRow, stopRow) == 0 && stopRowInclusive == true);
516    }
517
518    @Override
519    public boolean equals(Object obj){
520      if (!(obj instanceof BasicRowRange)) {
521        return false;
522      }
523      if (this == obj) {
524        return true;
525      }
526      BasicRowRange rr = (BasicRowRange) obj;
527      return Bytes.equals(this.stopRow, rr.getStopRow()) &&
528          Bytes.equals(this.startRow, this.getStartRow()) &&
529          this.startRowInclusive == rr.isStartRowInclusive() &&
530          this.stopRowInclusive == rr.isStopRowInclusive();
531    }
532
533    @Override
534    public int hashCode() {
535      return Objects.hash(Bytes.hashCode(this.stopRow),
536          Bytes.hashCode(this.startRow),
537          this.startRowInclusive,
538          this.stopRowInclusive);
539    }
540
541    /**
542     * Returns the data to be used to compare {@code this} to another object.
543     */
544    public abstract byte[] getComparisonData();
545
546    /**
547     * Returns whether the bounding row used for binary-search is inclusive or not.
548     *
549     * For forward scans, we would check the starRow, but we would check the stopRow for
550     * the reverse scan case.
551     */
552    public abstract boolean isSearchRowInclusive();
553
554    @Override
555    public int compareTo(BasicRowRange other) {
556      byte[] left;
557      byte[] right;
558      if (isAscendingOrder()) {
559        left = this.getComparisonData();
560        right = other.getComparisonData();
561      } else {
562        left = other.getComparisonData();
563        right = this.getComparisonData();
564      }
565      return Bytes.compareTo(left, right);
566    }
567
568    public abstract boolean isAscendingOrder();
569  }
570
571  /**
572   * Internal RowRange that reverses the sort-order to handle reverse scans.
573   */
574  @InterfaceAudience.Private
575  private static class ReversedRowRange extends BasicRowRange {
576    public ReversedRowRange(byte[] startRow,  boolean startRowInclusive, byte[] stopRow,
577        boolean stopRowInclusive) {
578      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
579    }
580
581    @Override
582    public byte[] getComparisonData() {
583      return this.stopRow;
584    }
585
586    @Override
587    public boolean isSearchRowInclusive() {
588      return this.stopRowInclusive;
589    }
590
591    @Override
592    public boolean isAscendingOrder() {
593      return false;
594    }
595  }
596
597  @InterfaceAudience.Public
598  public static class RowRange extends BasicRowRange {
599    public RowRange() {
600    }
601    /**
602     * If the startRow is empty or null, set it to HConstants.EMPTY_BYTE_ARRAY, means begin at the
603     * start row of the table. If the stopRow is empty or null, set it to
604     * HConstants.EMPTY_BYTE_ARRAY, means end of the last row of table.
605     */
606    public RowRange(String startRow, boolean startRowInclusive, String stopRow,
607        boolean stopRowInclusive) {
608      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
609    }
610
611    public RowRange(byte[] startRow,  boolean startRowInclusive, byte[] stopRow,
612        boolean stopRowInclusive) {
613      super(startRow, startRowInclusive, stopRow, stopRowInclusive);
614    }
615
616    @Override
617    public byte[] getComparisonData() {
618      return startRow;
619    }
620
621    @Override
622    public boolean isSearchRowInclusive() {
623      return startRowInclusive;
624    }
625
626    @Override
627    public boolean isAscendingOrder() {
628      return true;
629    }
630  }
631
632  /**
633   * Abstraction over the ranges of rows to return from this filter, regardless of forward or
634   * reverse scans being used. This Filter can use this class, agnostic of iteration direction,
635   * as the same algorithm can be applied in both cases.
636   */
637  @InterfaceAudience.Private
638  private static class RangeIteration {
639    private boolean exclusive = false;
640    private boolean initialized = false;
641    private boolean foundFirstRange = false;
642    private boolean reversed = false;
643    private final List<RowRange> sortedAndMergedRanges;
644    private List<? extends BasicRowRange> ranges;
645
646    public RangeIteration(List<RowRange> sortedAndMergedRanges) {
647      this.sortedAndMergedRanges = sortedAndMergedRanges;
648    }
649
650    void initialize(boolean reversed) {
651      // Avoid double initialization
652      assert !this.initialized;
653      this.reversed = reversed;
654      if (reversed) {
655        // If we are doing a reverse scan, we can reverse the ranges (both the elements in
656        // the list as well as their start/stop key), and use the same algorithm.
657        this.ranges = flipAndReverseRanges(sortedAndMergedRanges);
658      } else {
659        this.ranges = sortedAndMergedRanges;
660      }
661      this.initialized = true;
662    }
663
664    /**
665     * Rebuilds the sorted ranges (by startKey) into an equivalent sorted list of ranges, only by
666     * stopKey instead. Descending order and the ReversedRowRange compareTo implementation make
667     * sure that we can use Collections.binarySearch().
668     */
669    static List<ReversedRowRange> flipAndReverseRanges(List<RowRange> ranges) {
670      List<ReversedRowRange> flippedRanges = new ArrayList<>(ranges.size());
671      for (int i = ranges.size() - 1; i >= 0; i--) {
672        RowRange origRange = ranges.get(i);
673        ReversedRowRange newRowRange = new ReversedRowRange(
674            origRange.startRow, origRange.startRowInclusive, origRange.stopRow,
675            origRange.isStopRowInclusive());
676        flippedRanges.add(newRowRange);
677      }
678      return flippedRanges;
679    }
680
681    /**
682     * Calculates the position where the given rowkey fits in the ranges list.
683     *
684     * @param rowKey the row key to calculate
685     * @return index the position of the row key
686     */
687    public int getNextRangeIndex(byte[] rowKey) {
688      BasicRowRange temp;
689      if (reversed) {
690        temp = new ReversedRowRange(null, true, rowKey, true);
691      } else {
692        temp = new RowRange(rowKey, true, null, true);
693      }
694      // Because we make sure that `ranges` has the correct natural ordering (given it containing
695      // RowRange or ReverseRowRange objects). This keeps us from having to have two different
696      // implementations below.
697      final int index = Collections.binarySearch(ranges, temp);
698      if (index < 0) {
699        int insertionPosition = -index - 1;
700        // check if the row key in the range before the insertion position
701        if (insertionPosition != 0 && ranges.get(insertionPosition - 1).contains(rowKey)) {
702          return insertionPosition - 1;
703        }
704        // check if the row key is before the first range
705        if (insertionPosition == 0 && !ranges.get(insertionPosition).contains(rowKey)) {
706          return ROW_BEFORE_FIRST_RANGE;
707        }
708        if (!foundFirstRange) {
709          foundFirstRange = true;
710        }
711        return insertionPosition;
712      }
713      // the row key equals one of the start keys, and the the range exclude the start key
714      if(ranges.get(index).isSearchRowInclusive() == false) {
715        exclusive = true;
716      }
717      return index;
718    }
719
720    /**
721     * Sets {@link #foundFirstRange} to {@code true}, indicating that we found a matching row range.
722     */
723    public void setFoundFirstRange() {
724      this.foundFirstRange = true;
725    }
726
727    /**
728     * Gets the RowRange at the given offset.
729     */
730    @SuppressWarnings("unchecked")
731    public <T extends BasicRowRange> T get(int i) {
732      return (T) ranges.get(i);
733    }
734
735    /**
736     * Returns true if the first matching row range was found.
737     */
738    public boolean hasFoundFirstRange() {
739      return foundFirstRange;
740    }
741
742    /**
743     * Returns true if the current range's key is exclusive
744     */
745    public boolean isExclusive() {
746      return exclusive;
747    }
748
749    /**
750     * Resets the exclusive flag.
751     */
752    public void resetExclusive() {
753      exclusive = false;
754    }
755
756    /**
757     * Returns true if this class has been initialized by calling {@link #initialize(boolean)}.
758     */
759    public boolean isInitialized() {
760      return initialized;
761    }
762
763    /**
764     * Returns true if we exhausted searching all row ranges.
765     */
766    public boolean isIterationComplete(int index) {
767      return index >= ranges.size();
768    }
769  }
770
771  @Override
772  public boolean equals(Object obj) {
773    return obj instanceof Filter && areSerializedFieldsEqual((Filter) obj);
774  }
775
776  @Override
777  public int hashCode() {
778    return Objects.hash(this.ranges);
779  }
780}