View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.filter;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collections;
23  import java.util.List;
24  
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.HConstants;
27  import org.apache.hadoop.hbase.KeyValueUtil;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.hbase.exceptions.DeserializationException;
31  import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
32  import org.apache.hadoop.hbase.util.ByteStringer;
33  import org.apache.hadoop.hbase.util.Bytes;
34  
35  import com.google.protobuf.InvalidProtocolBufferException;
36  
37  /**
38   * Filter to support scan multiple row key ranges. It can construct the row key ranges from the
39   * passed list which can be accessed by each region server.
40   *
41   * HBase is quite efficient when scanning only one small row key range. If user needs to specify
42   * multiple row key ranges in one scan, the typical solutions are: 1. through FilterList which is a
43   * list of row key Filters, 2. using the SQL layer over HBase to join with two table, such as hive,
44   * phoenix etc. However, both solutions are inefficient. Both of them can't utilize the range info
45   * to perform fast forwarding during scan which is quite time consuming. If the number of ranges
46   * are quite big (e.g. millions), join is a proper solution though it is slow. However, there are
47   * cases that user wants to specify a small number of ranges to scan (e.g. <1000 ranges). Both
48   * solutions can't provide satisfactory performance in such case. MultiRowRangeFilter is to support
49   * such usec ase (scan multiple row key ranges), which can construct the row key ranges from user
50   * specified list and perform fast-forwarding during scan. Thus, the scan will be quite efficient.
51   */
52  @InterfaceAudience.Public
53  @InterfaceStability.Evolving
54  public class MultiRowRangeFilter extends FilterBase {
55  
56    private List<RowRange> rangeList;
57  
58    private static final int ROW_BEFORE_FIRST_RANGE = -1;
59    private boolean EXCLUSIVE = false;
60    private boolean done = false;
61    private boolean initialized = false;
62    private int index;
63    private RowRange range;
64    private ReturnCode currentReturnCode;
65  
66    /**
67     * @param list A list of <code>RowRange</code>
68     * @throws java.io.IOException
69     *           throw an exception if the range list is not in an natural order or any
70     *           <code>RowRange</code> is invalid
71     */
72    public MultiRowRangeFilter(List<RowRange> list) throws IOException {
73      this.rangeList = sortAndMerge(list);
74    }
75  
76    @Override
77    public boolean filterAllRemaining() {
78      return done;
79    }
80  
81    public List<RowRange> getRowRanges() {
82      return this.rangeList;
83    }
84  
85    @Override
86    public boolean filterRowKey(byte[] buffer, int offset, int length) {
87      // If it is the first time of running, calculate the current range index for
88      // the row key. If index is out of bound which happens when the start row
89      // user sets is after the largest stop row of the ranges, stop the scan.
90      // If row key is after the current range, find the next range and update index.
91      if (!initialized || !range.contains(buffer, offset, length)) {
92        byte[] rowkey = new byte[length];
93        System.arraycopy(buffer, offset, rowkey, 0, length);
94        index = getNextRangeIndex(rowkey);
95        if (index >= rangeList.size()) {
96          done = true;
97          currentReturnCode = ReturnCode.NEXT_ROW;
98          return false;
99        }
100       if(index != ROW_BEFORE_FIRST_RANGE) {
101         range = rangeList.get(index);
102       } else {
103         range = rangeList.get(0);
104       }
105       if (EXCLUSIVE) {
106         EXCLUSIVE = false;
107         currentReturnCode = ReturnCode.NEXT_ROW;
108         return false;
109       }
110       if (!initialized) {
111         if(index != ROW_BEFORE_FIRST_RANGE) {
112           currentReturnCode = ReturnCode.INCLUDE;
113         } else {
114           currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
115         }
116         initialized = true;
117       } else {
118         if (range.contains(buffer, offset, length)) {
119           currentReturnCode = ReturnCode.INCLUDE;
120         } else {
121           currentReturnCode = ReturnCode.SEEK_NEXT_USING_HINT;
122         }
123       }
124     } else {
125       currentReturnCode = ReturnCode.INCLUDE;
126     }
127     return false;
128   }
129 
130   @Override
131   public ReturnCode filterKeyValue(Cell ignored) {
132     return currentReturnCode;
133   }
134 
135   @Override
136   public Cell getNextCellHint(Cell currentKV) {
137     // skip to the next range's start row
138     return KeyValueUtil.createFirstOnRow(range.startRow);
139   }
140 
141   /**
142    * @return The filter serialized using pb
143    */
144   public byte[] toByteArray() {
145     FilterProtos.MultiRowRangeFilter.Builder builder = FilterProtos.MultiRowRangeFilter
146         .newBuilder();
147     for (RowRange range : rangeList) {
148       if (range != null) {
149         FilterProtos.RowRange.Builder rangebuilder = FilterProtos.RowRange.newBuilder();
150         if (range.startRow != null)
151           rangebuilder.setStartRow(ByteStringer.wrap(range.startRow));
152         rangebuilder.setStartRowInclusive(range.startRowInclusive);
153         if (range.stopRow != null)
154           rangebuilder.setStopRow(ByteStringer.wrap(range.stopRow));
155         rangebuilder.setStopRowInclusive(range.stopRowInclusive);
156         builder.addRowRangeList(rangebuilder.build());
157       }
158     }
159     return builder.build().toByteArray();
160   }
161 
162   /**
163    * @param pbBytes A pb serialized instance
164    * @return An instance of MultiRowRangeFilter
165    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
166    */
167   public static MultiRowRangeFilter parseFrom(final byte[] pbBytes)
168       throws DeserializationException {
169     FilterProtos.MultiRowRangeFilter proto;
170     try {
171       proto = FilterProtos.MultiRowRangeFilter.parseFrom(pbBytes);
172     } catch (InvalidProtocolBufferException e) {
173       throw new DeserializationException(e);
174     }
175     int length = proto.getRowRangeListCount();
176     List<FilterProtos.RowRange> rangeProtos = proto.getRowRangeListList();
177     List<RowRange> rangeList = new ArrayList<RowRange>(length);
178     for (FilterProtos.RowRange rangeProto : rangeProtos) {
179       RowRange range = new RowRange(rangeProto.hasStartRow() ? rangeProto.getStartRow()
180           .toByteArray() : null, rangeProto.getStartRowInclusive(), rangeProto.hasStopRow() ?
181               rangeProto.getStopRow().toByteArray() : null, rangeProto.getStopRowInclusive());
182       rangeList.add(range);
183     }
184     try {
185       return new MultiRowRangeFilter(rangeList);
186     } catch (IOException e) {
187       throw new DeserializationException("Fail to instantiate the MultiRowRangeFilter", e);
188     }
189   }
190 
191   /**
192    * @param o the filter to compare
193    * @return true if and only if the fields of the filter that are serialized are equal to the
194    *         corresponding fields in other. Used for testing.
195    */
196   boolean areSerializedFieldsEqual(Filter o) {
197     if (o == this)
198       return true;
199     if (!(o instanceof MultiRowRangeFilter))
200       return false;
201 
202     MultiRowRangeFilter other = (MultiRowRangeFilter) o;
203     if (this.rangeList.size() != other.rangeList.size())
204       return false;
205     for (int i = 0; i < rangeList.size(); ++i) {
206       RowRange thisRange = this.rangeList.get(i);
207       RowRange otherRange = other.rangeList.get(i);
208       if (!(Bytes.equals(thisRange.startRow, otherRange.startRow) && Bytes.equals(
209           thisRange.stopRow, otherRange.stopRow) && (thisRange.startRowInclusive ==
210           otherRange.startRowInclusive) && (thisRange.stopRowInclusive ==
211           otherRange.stopRowInclusive))) {
212         return false;
213       }
214     }
215     return true;
216   }
217 
218   /**
219    * calculate the position where the row key in the ranges list.
220    *
221    * @param rowKey the row key to calculate
222    * @return index the position of the row key
223    */
224   private int getNextRangeIndex(byte[] rowKey) {
225     RowRange temp = new RowRange(rowKey, true, null, true);
226     int index = Collections.binarySearch(rangeList, temp);
227     if (index < 0) {
228       int insertionPosition = -index - 1;
229       // check if the row key in the range before the insertion position
230       if (insertionPosition != 0 && rangeList.get(insertionPosition - 1).contains(rowKey)) {
231         return insertionPosition - 1;
232       }
233       // check if the row key is before the first range
234       if (insertionPosition == 0 && !rangeList.get(insertionPosition).contains(rowKey)) {
235         return ROW_BEFORE_FIRST_RANGE;
236       }
237       if (!initialized) {
238         initialized = true;
239       }
240       return insertionPosition;
241     }
242     // the row key equals one of the start keys, and the the range exclude the start key
243     if(rangeList.get(index).startRowInclusive == false) {
244       EXCLUSIVE = true;
245     }
246     return index;
247   }
248 
249   /**
250    * sort the ranges and if the ranges with overlap, then merge them.
251    *
252    * @param ranges the list of ranges to sort and merge.
253    * @return the ranges after sort and merge.
254    */
255   public static List<RowRange> sortAndMerge(List<RowRange> ranges) {
256     if (ranges.size() == 0) {
257       throw new IllegalArgumentException("No ranges found.");
258     }
259     List<RowRange> invalidRanges = new ArrayList<RowRange>();
260     List<RowRange> newRanges = new ArrayList<RowRange>(ranges.size());
261     Collections.sort(ranges);
262     if(ranges.get(0).isValid()) {
263       if (ranges.size() == 1) {
264         newRanges.add(ranges.get(0));
265       }
266     } else {
267       invalidRanges.add(ranges.get(0));
268     }
269 
270     byte[] lastStartRow = ranges.get(0).startRow;
271     boolean lastStartRowInclusive = ranges.get(0).startRowInclusive;
272     byte[] lastStopRow = ranges.get(0).stopRow;
273     boolean lastStopRowInclusive = ranges.get(0).stopRowInclusive;
274     int i = 1;
275     for (; i < ranges.size(); i++) {
276       RowRange range = ranges.get(i);
277       if (!range.isValid()) {
278         invalidRanges.add(range);
279       }
280       if(Bytes.equals(lastStopRow, HConstants.EMPTY_BYTE_ARRAY)) {
281         newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
282             lastStopRowInclusive));
283         break;
284       }
285       // with overlap in the ranges
286       if ((Bytes.compareTo(lastStopRow, range.startRow) > 0) ||
287           (Bytes.compareTo(lastStopRow, range.startRow) == 0 && !(lastStopRowInclusive == false &&
288           range.isStartRowInclusive() == false))) {
289         if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
290           newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
291               range.stopRowInclusive));
292           break;
293         }
294         // if first range contains second range, ignore the second range
295         if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
296           if((Bytes.compareTo(lastStopRow, range.stopRow) == 0)) {
297             if(lastStopRowInclusive == true || range.stopRowInclusive == true) {
298               lastStopRowInclusive = true;
299             }
300           }
301           if ((i + 1) == ranges.size()) {
302             newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
303                 lastStopRowInclusive));
304           }
305         } else {
306           lastStopRow = range.stopRow;
307           lastStopRowInclusive = range.stopRowInclusive;
308           if ((i + 1) < ranges.size()) {
309             i++;
310             range = ranges.get(i);
311             if (!range.isValid()) {
312               invalidRanges.add(range);
313             }
314           } else {
315             newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
316                 lastStopRowInclusive));
317             break;
318           }
319           while ((Bytes.compareTo(lastStopRow, range.startRow) > 0) ||
320               (Bytes.compareTo(lastStopRow, range.startRow) == 0 &&
321               (lastStopRowInclusive == true || range.startRowInclusive==true))) {
322             if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
323               break;
324             }
325             // if this first range contain second range, ignore the second range
326             if (Bytes.compareTo(lastStopRow, range.stopRow) >= 0) {
327               if(lastStopRowInclusive == true || range.stopRowInclusive == true) {
328                 lastStopRowInclusive = true;
329               }
330               i++;
331               if (i < ranges.size()) {
332                 range = ranges.get(i);
333                 if (!range.isValid()) {
334                   invalidRanges.add(range);
335                 }
336               } else {
337                 break;
338               }
339             } else {
340               lastStopRow = range.stopRow;
341               lastStopRowInclusive = range.stopRowInclusive;
342               i++;
343               if (i < ranges.size()) {
344                 range = ranges.get(i);
345                 if (!range.isValid()) {
346                   invalidRanges.add(range);
347                 }
348               } else {
349                 break;
350               }
351             }
352           }
353           if(Bytes.equals(range.stopRow, HConstants.EMPTY_BYTE_ARRAY)) {
354             if((Bytes.compareTo(lastStopRow, range.startRow) < 0) ||
355                 (Bytes.compareTo(lastStopRow, range.startRow) == 0 &&
356                 lastStopRowInclusive == false && range.startRowInclusive == false)) {
357               newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
358                   lastStopRowInclusive));
359               newRanges.add(range);
360             } else {
361               newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, range.stopRow,
362                   range.stopRowInclusive));
363               break;
364             }
365           }
366           newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
367               lastStopRowInclusive));
368           if ((i + 1) == ranges.size()) {
369             newRanges.add(range);
370           }
371           lastStartRow = range.startRow;
372           lastStartRowInclusive = range.startRowInclusive;
373           lastStopRow = range.stopRow;
374           lastStopRowInclusive = range.stopRowInclusive;
375         }
376       } else {
377         newRanges.add(new RowRange(lastStartRow, lastStartRowInclusive, lastStopRow,
378             lastStopRowInclusive));
379         if ((i + 1) == ranges.size()) {
380           newRanges.add(range);
381         }
382         lastStartRow = range.startRow;
383         lastStartRowInclusive = range.startRowInclusive;
384         lastStopRow = range.stopRow;
385         lastStopRowInclusive = range.stopRowInclusive;
386       }
387     }
388     // check the remaining ranges
389     for(int j=i; j < ranges.size(); j++) {
390       if(!ranges.get(j).isValid()) {
391         invalidRanges.add(ranges.get(j));
392       }
393     }
394     // if invalid range exists, throw the exception
395     if (invalidRanges.size() != 0) {
396       throwExceptionForInvalidRanges(invalidRanges, true);
397     }
398     // If no valid ranges found, throw the exception
399     if(newRanges.size() == 0) {
400       throw new IllegalArgumentException("No valid ranges found.");
401     }
402     return newRanges;
403   }
404 
405   private static void throwExceptionForInvalidRanges(List<RowRange> invalidRanges,
406       boolean details) {
407     StringBuilder sb = new StringBuilder();
408     sb.append(invalidRanges.size()).append(" invaild ranges.\n");
409     if (details) {
410       for (RowRange range : invalidRanges) {
411         sb.append(
412             "Invalid range: start row => " + Bytes.toString(range.startRow) + ", stop row => "
413                 + Bytes.toString(range.stopRow)).append('\n');
414       }
415     }
416     throw new IllegalArgumentException(sb.toString());
417   }
418 
419   @InterfaceAudience.Public
420   @InterfaceStability.Evolving
421   public static class RowRange implements Comparable<RowRange> {
422     private byte[] startRow;
423     private boolean startRowInclusive = true;
424     private byte[] stopRow;
425     private boolean stopRowInclusive = false;
426 
427     public RowRange() {
428     }
429     /**
430      * If the startRow is empty or null, set it to HConstants.EMPTY_BYTE_ARRAY, means begin at the
431      * start row of the table. If the stopRow is empty or null, set it to
432      * HConstants.EMPTY_BYTE_ARRAY, means end of the last row of table.
433      */
434     public RowRange(String startRow, boolean startRowInclusive, String stopRow,
435         boolean stopRowInclusive) {
436       this((startRow == null || startRow.isEmpty()) ? HConstants.EMPTY_BYTE_ARRAY :
437         Bytes.toBytes(startRow), startRowInclusive,
438         (stopRow == null || stopRow.isEmpty()) ? HConstants.EMPTY_BYTE_ARRAY :
439         Bytes.toBytes(stopRow), stopRowInclusive);
440     }
441 
442     public RowRange(byte[] startRow,  boolean startRowInclusive, byte[] stopRow,
443         boolean stopRowInclusive) {
444       this.startRow = (startRow == null) ? HConstants.EMPTY_BYTE_ARRAY : startRow;
445       this.startRowInclusive = startRowInclusive;
446       this.stopRow = (stopRow == null) ? HConstants.EMPTY_BYTE_ARRAY :stopRow;
447       this.stopRowInclusive = stopRowInclusive;
448     }
449 
450     public byte[] getStartRow() {
451       return startRow;
452     }
453 
454     public byte[] getStopRow() {
455       return stopRow;
456     }
457 
458     /**
459      * @return if start row is inclusive.
460      */
461     public boolean isStartRowInclusive() {
462       return startRowInclusive;
463     }
464 
465     /**
466      * @return if stop row is inclusive.
467      */
468     public boolean isStopRowInclusive() {
469       return stopRowInclusive;
470     }
471 
472     public boolean contains(byte[] row) {
473       return contains(row, 0, row.length);
474     }
475 
476     public boolean contains(byte[] buffer, int offset, int length) {
477       if(startRowInclusive) {
478         if(stopRowInclusive) {
479           return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
480               && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
481                   Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= 0);
482         } else {
483           return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) >= 0
484               && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
485                   Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < 0);
486         }
487       } else {
488         if(stopRowInclusive) {
489           return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
490               && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
491                   Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) <= 0);
492         } else {
493           return Bytes.compareTo(buffer, offset, length, startRow, 0, startRow.length) > 0
494               && (Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY) ||
495                   Bytes.compareTo(buffer, offset, length, stopRow, 0, stopRow.length) < 0);
496         }
497       }
498     }
499 
500     @Override
501     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
502       justification="This compareTo is not of this Object, but of referenced RowRange")
503     public int compareTo(RowRange other) {
504       return Bytes.compareTo(this.startRow, other.startRow);
505     }
506 
507     public boolean isValid() {
508       return Bytes.equals(startRow, HConstants.EMPTY_BYTE_ARRAY)
509           || Bytes.equals(stopRow, HConstants.EMPTY_BYTE_ARRAY)
510           || Bytes.compareTo(startRow, stopRow) < 0
511           || (Bytes.compareTo(startRow, stopRow) == 0 && stopRowInclusive == true);
512     }
513   }
514 }