View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.filter;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collections;
23  import java.util.List;
24  
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.HConstants;
27  import org.apache.hadoop.hbase.KeyValueUtil;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.hbase.exceptions.DeserializationException;
31  import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
32  import org.apache.hadoop.hbase.util.ByteStringer;
33  import org.apache.hadoop.hbase.util.Bytes;
34  
35  import com.google.protobuf.InvalidProtocolBufferException;
36  
37  /**
38   * Filter to support scan multiple row key ranges. It can construct the row key ranges from the
39   * passed list which can be accessed by each region server.
40   *
41   * HBase is quite efficient when scanning only one small row key range. If user needs to specify
42   * multiple row key ranges in one scan, the typical solutions are: 1. through FilterList which is a
43   * list of row key Filters, 2. using the SQL layer over HBase to join with two table, such as hive,
44   * phoenix etc. However, both solutions are inefficient. Both of them can't utilize the range info
45   * to perform fast forwarding during scan which is quite time consuming. If the number of ranges
46   * are quite big (e.g. millions), join is a proper solution though it is slow. However, there are
47   * cases that user wants to specify a small number of ranges to scan (e.g. <1000 ranges). Both
48   * solutions can't provide satisfactory performance in such case. MultiRowRangeFilter is to support
49   * such usec ase (scan multiple row key ranges), which can construct the row key ranges from user
50   * specified list and perform fast-forwarding during scan. Thus, the scan will be quite efficient.
51   */
52  @InterfaceAudience.Public
53  @InterfaceStability.Evolving
54  public class MultiRowRangeFilter extends FilterBase {
55  
56    private List<RowRange> rangeList;
57  
58    private static final int ROW_BEFORE_FIRST_RANGE = -1;
59    private boolean EXCLUSIVE = false;
60    private boolean done = false;
61    private boolean initialized = false;
62    private int index;
63    private RowRange range;
64    private ReturnCode currentReturnCode;
65  
66    /**
67     * @param list A list of <code>RowRange</code>
68     * @throws java.io.IOException
69     *           throw an exception if the range list is not in an natural order or any
70     *           <code>RowRange</code> is invalid
71     */
72    public MultiRowRangeFilter(List<RowRange> list) throws IOException {
73      this.rangeList = sortAndMerge(list);
74    }
75  
76    @Override
77    public boolean filterAllRemaining() {
78      return done;
79    }
80  
81    public List<RowRange> getRowRanges() {
82      return this.rangeList;
83    }
84  
85    @Override
86    public boolean filterRowKey(byte[] buffer, int offset, int length) {
87      // If it is the first time of running, calculate the current range index for
88      // the row key. If index is out of bound which happens when the start row
89      // user sets is after the largest stop row of the ranges, stop the scan.
90      // If row key is after the current range, find the next range and update index.
91      if (!initialized || !range.contains(buffer, offset, length)) {
92        byte[] rowkey = new byte[length];
93        System.arraycopy(buffer, offset, rowkey, 0, length);
94        index = getNextRangeIndex(rowkey);
95        if (index >= rangeList.size()) {
96          done = true;
97          currentReturnCode = ReturnCode.NEXT_ROW;
98          return false;
99        }
100       if(index != ROW_BEFORE_FIRST_RANGE) {
101         range = rangeList.get(index);
102       } else {
103         range = rangeList.get(0);
104       }
105       if (EXCLUSIVE) {
106         EXCLUSIVE = false;
107         currentReturnCode = ReturnCode.NEXT_ROW;
108         return false;
109       }
110       if (!initialized) {
111         if(index != ROW_BEFORE_FIRST_RANGE) {
112           currentReturnCode = ReturnCode.INCLUDE;
113         } else {
114           currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
115         }
116         initialized = true;
117       } else {
118         if (range.contains(buffer, offset, length)) {
119           currentReturnCode = ReturnCode.INCLUDE;
120         } else currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
121       }
122     } else {
123       currentReturnCode = ReturnCode.INCLUDE;
124     }
125     return false;
126   }
127 
128   @Override
129   public ReturnCode filterKeyValue(Cell ignored) {
130     return currentReturnCode;
131   }
132 
133   @Override
134   public Cell getNextCellHint(Cell currentKV) {
135     // skip to the next range's start row
136     return KeyValueUtil.createFirstOnRow(range.startRow);
137   }
138 
139   /**
140    * @return The filter serialized using pb
141    */
142   public byte[] toByteArray() {
143     FilterProtos.MultiRowRangeFilter.Builder builder = FilterProtos.MultiRowRangeFilter
144         .newBuilder();
145     for (RowRange range : rangeList) {
146       if (range != null) {
147         FilterProtos.RowRange.Builder rangebuilder = FilterProtos.RowRange.newBuilder();
148         if (range.startRow != null)
149           rangebuilder.setStartRow(ByteStringer.wrap(range.startRow));
150         rangebuilder.setStartRowInclusive(range.startRowInclusive);
151         if (range.stopRow != null)
152           rangebuilder.setStopRow(ByteStringer.wrap(range.stopRow));
153         rangebuilder.setStopRowInclusive(range.stopRowInclusive);
154         range.isScan = Bytes.equals(range.startRow, range.stopRow) ? 1 : 0;
155         builder.addRowRangeList(rangebuilder.build());
156       }
157     }
158     return builder.build().toByteArray();
159   }
160 
161   /**
162    * @param pbBytes A pb serialized instance
163    * @return An instance of MultiRowRangeFilter
164    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
165    */
166   public static MultiRowRangeFilter parseFrom(final byte[] pbBytes)
167       throws DeserializationException {
168     FilterProtos.MultiRowRangeFilter proto;
169     try {
170       proto = FilterProtos.MultiRowRangeFilter.parseFrom(pbBytes);
171     } catch (InvalidProtocolBufferException e) {
172       throw new DeserializationException(e);
173     }
174     int length = proto.getRowRangeListCount();
175     List<FilterProtos.RowRange> rangeProtos = proto.getRowRangeListList();
176     List<RowRange> rangeList = new ArrayList<RowRange>(length);
177     for (FilterProtos.RowRange rangeProto : rangeProtos) {
178       RowRange range = new RowRange(rangeProto.hasStartRow() ? rangeProto.getStartRow()
179           .toByteArray() : null, rangeProto.getStartRowInclusive(), rangeProto.hasStopRow() ?
180               rangeProto.getStopRow().toByteArray() : null, rangeProto.getStopRowInclusive());
181       rangeList.add(range);
182     }
183     try {
184       return new MultiRowRangeFilter(rangeList);
185     } catch (IOException e) {
186       throw new DeserializationException("Fail to instantiate the MultiRowRangeFilter", e);
187     }
188   }
189 
190   /**
191    * @param o the filter to compare
192    * @return true if and only if the fields of the filter that are serialized are equal to the
193    *         corresponding fields in other. Used for testing.
194    */
195   boolean areSerializedFieldsEqual(Filter o) {
196     if (o == this)
197       return true;
198     if (!(o instanceof MultiRowRangeFilter))
199       return false;
200 
201     MultiRowRangeFilter other = (MultiRowRangeFilter) o;
202     if (this.rangeList.size() != other.rangeList.size())
203       return false;
204     for (int i = 0; i < rangeList.size(); ++i) {
205       RowRange thisRange = this.rangeList.get(i);
206       RowRange otherRange = other.rangeList.get(i);
207       if (!(Bytes.equals(thisRange.startRow, otherRange.startRow) && Bytes.equals(
208           thisRange.stopRow, otherRange.stopRow) && (thisRange.startRowInclusive ==
209           otherRange.startRowInclusive) && (thisRange.stopRowInclusive ==
210           otherRange.stopRowInclusive))) {
211         return false;
212       }
213     }
214     return true;
215   }
216 
217   /**
218    * calculate the position where the row key in the ranges list.
219    *
220    * @param rowKey the row key to calculate
221    * @return index the position of the row key
222    */
223   private int getNextRangeIndex(byte[] rowKey) {
224     RowRange temp = new RowRange(rowKey, true, null, true);
225     int index = Collections.binarySearch(rangeList, temp);
226     if (index < 0) {
227       int insertionPosition = -index - 1;
228       // check if the row key in the range before the insertion position
229       if (insertionPosition != 0 && rangeList.get(insertionPosition - 1).contains(rowKey)) {
230         return insertionPosition - 1;
231       }
232       // check if the row key is before the first range
233       if (insertionPosition == 0 && !rangeList.get(insertionPosition).contains(rowKey)) {
234         return ROW_BEFORE_FIRST_RANGE;
235       }
236       if (!initialized) {
237         initialized = true;
238       }
239       return insertionPosition;
240     }
241     // the row key equals one of the start keys, and the the range exclude the start key
242     if(rangeList.get(index).startRowInclusive == false) {
243       EXCLUSIVE = true;
244     }
245     return index;
246   }
247 
248   /**
249    * sort the ranges and if the ranges with overlap, then merge them.
250    *
251    * @param ranges the list of ranges to sort and merge.
252    * @return the ranges after sort and merge.
253    */
254   public static List<RowRange> sortAndMerge(List<RowRange> ranges) {
255     if (ranges.size() == 0) {
256       throw new IllegalArgumentException("No ranges found.");
257     }
258     List<RowRange> invalidRanges = new ArrayList<RowRange>();
259     List<RowRange> newRanges = new ArrayList<RowRange>(ranges.size());
260     Collections.sort(ranges);
261     if(ranges.get(0).isValid()) {
262       if (ranges.size() == 1) {
263         newRanges.add(ranges.get(0));
264       }
265     } else {
266       invalidRanges.add(ranges.get(0));
267     }
268 
269     byte[] lastStartRow = ranges.get(0).startRow;
270     boolean lastStartRowInclusive = ranges.get(0).startRowInclusive;
271     byte[] lastStopRow = ranges.get(0).stopRow;
272     boolean lastStopRowInclusive = ranges.get(0).stopRowInclusive;
273     int i = 1;
274     for (; i < ranges.size(); i++) {
275       RowRange range = ranges.get(i);
276       if (!range.isValid()) {
277         invalidRanges.add(range);
278       }
279       if(Bytes.equals(lastStopRow, HConstants.EMPTY_BYTE_ARRAY)) {
280         newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
281             lastStopRowInclusive));
282         break;
283       }
284       // with overlap in the ranges
285       if ((Bytes.compareTo(lastStopRow, range.startRow) > 0) ||
286           (Bytes.compareTo(lastStopRow, range.startRow) == 0 && !(lastStopRowInclusive == false &&
287           range.isStartRowInclusive() == false))) {
288         if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
289           newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
290               range.stopRowInclusive));
291           break;
292         }
293         // if first range contains second range, ignore the second range
294         if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
295           if((Bytes.compareTo(lastStopRow, range.stopRow) == 0)) {
296             if(lastStopRowInclusive == true || range.stopRowInclusive == true) {
297               lastStopRowInclusive = true;
298             }
299           }
300           if ((i + 1) == ranges.size()) {
301             newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
302                 lastStopRowInclusive));
303           }
304         } else {
305           lastStopRow = range.stopRow;
306           lastStopRowInclusive = range.stopRowInclusive;
307           if ((i + 1) < ranges.size()) {
308             i++;
309             range = ranges.get(i);
310             if (!range.isValid()) {
311               invalidRanges.add(range);
312             }
313           } else {
314             newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
315                 lastStopRowInclusive));
316             break;
317           }
318           while ((Bytes.compareTo(lastStopRow, range.startRow) > 0) ||
319               (Bytes.compareTo(lastStopRow, range.startRow) == 0 &&
320               (lastStopRowInclusive == true || range.startRowInclusive==true))) {
321             if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
322               break;
323             }
324             // if this first range contain second range, ignore the second range
325             if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
326               if(lastStopRowInclusive == true || range.stopRowInclusive == true) {
327                 lastStopRowInclusive = true;
328               }
329               i++;
330               if (i < ranges.size()) {
331                 range = ranges.get(i);
332                 if (!range.isValid()) {
333                   invalidRanges.add(range);
334                 }
335               } else {
336                 break;
337               }
338             } else {
339               lastStopRow = range.stopRow;
340               lastStopRowInclusive = range.stopRowInclusive;
341               i++;
342               if (i < ranges.size()) {
343                 range = ranges.get(i);
344                 if (!range.isValid()) {
345                   invalidRanges.add(range);
346                 }
347               } else {
348                 break;
349               }
350             }
351           }
352           if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
353             if((Bytes.compareTo(lastStopRow, range.startRow) < 0) ||
354                 (Bytes.compareTo(lastStopRow, range.startRow) == 0 &&
355                 lastStopRowInclusive == false && range.startRowInclusive == false)) {
356               newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
357                   lastStopRowInclusive));
358               newRanges.add(range);
359             } else {
360               newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
361                   range.stopRowInclusive));
362               break;
363             }
364           }
365           newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
366               lastStopRowInclusive));
367           if ((i + 1) == ranges.size()) {
368             newRanges.add(range);
369           }
370           lastStartRow = range.startRow;
371           lastStartRowInclusive = range.startRowInclusive;
372           lastStopRow = range.stopRow;
373           lastStopRowInclusive = range.stopRowInclusive;
374         }
375       } else {
376         newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
377             lastStopRowInclusive));
378         if ((i + 1) == ranges.size()) {
379           newRanges.add(range);
380         }
381         lastStartRow = range.startRow;
382         lastStartRowInclusive = range.startRowInclusive;
383         lastStopRow = range.stopRow;
384         lastStopRowInclusive = range.stopRowInclusive;
385       }
386     }
387     // check the remaining ranges
388     for(int j=i; j < ranges.size(); j++) {
389       if(!ranges.get(j).isValid()) {
390         invalidRanges.add(ranges.get(j));
391       }
392     }
393     // if invalid range exists, throw the exception
394     if (invalidRanges.size() != 0) {
395       throwExceptionForInvalidRanges(invalidRanges, true);
396     }
397     // If no valid ranges found, throw the exception
398     if(newRanges.size() == 0) {
399       throw new IllegalArgumentException("No valid ranges found.");
400     }
401     return newRanges;
402   }
403 
404   private static void throwExceptionForInvalidRanges(List<RowRange> invalidRanges,
405       boolean details) {
406     StringBuilder sb = new StringBuilder();
407     sb.append(invalidRanges.size()).append(" invaild ranges.\n");
408     if (details) {
409       for (RowRange range : invalidRanges) {
410         sb.append(
411             "Invalid range: start row => " + Bytes.toString(range.startRow) + ", stop row => "
412                 + Bytes.toString(range.stopRow)).append('\n');
413       }
414     }
415     throw new IllegalArgumentException(sb.toString());
416   }
417 
418   @InterfaceAudience.Public
419   @InterfaceStability.Evolving
420   public static class RowRange implements Comparable<RowRange> {
421     private byte[] startRow;
422     private boolean startRowInclusive = true;
423     private byte[] stopRow;
424     private boolean stopRowInclusive = false;
425     private int isScan = 0;
426 
427     public RowRange() {
428     }
429     /**
430      * If the startRow is empty or null, set it to HConstants.EMPTY_BYTE_ARRAY, means begin at the
431      * start row of the table. If the stopRow is empty or null, set it to
432      * HConstants.EMPTY_BYTE_ARRAY, means end of the last row of table.
433      */
434     public RowRange(String startRow, boolean startRowInclusive, String stopRow,
435         boolean stopRowInclusive) {
436       this((startRow == null || startRow.isEmpty()) ? HConstants.EMPTY_BYTE_ARRAY :
437         Bytes.toBytes(startRow), startRowInclusive,
438         (stopRow == null || stopRow.isEmpty()) ? HConstants.EMPTY_BYTE_ARRAY :
439         Bytes.toBytes(stopRow), stopRowInclusive);
440     }
441 
442     public RowRange(byte[] startRow,  boolean startRowInclusive, byte[] stopRow,
443         boolean stopRowInclusive) {
444       this.startRow = (startRow == null) ? HConstants.EMPTY_BYTE_ARRAY : startRow;
445       this.startRowInclusive = startRowInclusive;
446       this.stopRow = (stopRow == null) ? HConstants.EMPTY_BYTE_ARRAY :stopRow;
447       this.stopRowInclusive = stopRowInclusive;
448       isScan = Bytes.equals(startRow, stopRow) ? 1 : 0;
449     }
450 
451     public byte[] getStartRow() {
452       return startRow;
453     }
454 
455     public byte[] getStopRow() {
456       return stopRow;
457     }
458 
459     /**
460      * @return if start row is inclusive.
461      */
462     public boolean isStartRowInclusive() {
463       return startRowInclusive;
464     }
465 
466     /**
467      * @return if stop row is inclusive.
468      */
469     public boolean isStopRowInclusive() {
470       return stopRowInclusive;
471     }
472 
473     public boolean contains(byte[] row) {
474       return contains(row, 0, row.length);
475     }
476 
477     public boolean contains(byte[] buffer, int offset, int length) {
478       if(startRowInclusive) {
479         if(stopRowInclusive) {
480           return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
481               && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
482                   Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= isScan);
483         } else {
484           return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
485               && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
486                   Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < isScan);
487         }
488       } else {
489         if(stopRowInclusive) {
490           return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
491               && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
492                   Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= isScan);
493         } else {
494           return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
495               && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
496                   Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < isScan);
497         }
498       }
499     }
500 
501     @Override
502     public int compareTo(RowRange other) {
503       return Bytes.compareTo(this.startRow, other.startRow);
504     }
505 
506     public boolean isValid() {
507       return Bytes.equals(startRow, HConstants.EMPTY_BYTE_ARRAY)
508           || Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
509           || Bytes.compareTo(startRow, stopRow) < 0
510           || (Bytes.compareTo(startRow, stopRow) == 0 && stopRowInclusive == true);
511     }
512   }
513 }