View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.filter;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.CellComparator;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.classification.InterfaceStability;
29  import org.apache.hadoop.hbase.exceptions.DeserializationException;
30  import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
31  import org.apache.hadoop.hbase.util.ByteStringer;
32  import org.apache.hadoop.hbase.util.Bytes;
33  
34  import com.google.common.base.Preconditions;
35  import com.google.protobuf.InvalidProtocolBufferException;
36  
37  /**
38   * A filter, based on the ColumnCountGetFilter, takes two arguments: limit and offset.
39   * This filter can be used for row-based indexing, where references to other tables are stored across many columns,
40   * in order to efficient lookups and paginated results for end users. Only most recent versions are considered
41   * for pagination.
42   */
43  @InterfaceAudience.Public
44  @InterfaceStability.Stable
45  public class ColumnPaginationFilter extends FilterBase {
46  
47    private int limit = 0;
48    private int offset = -1;
49    private byte[] columnOffset = null;
50    private int count = 0;
51  
52    /**
53     * Initializes filter with an integer offset and limit. The offset is arrived at
54     * scanning sequentially and skipping entries. @limit number of columns are
55     * then retrieved. If multiple column families are involved, the columns may be spread
56     * across them.
57     *
58     * @param limit Max number of columns to return.
59     * @param offset The integer offset where to start pagination.
60     */
61    public ColumnPaginationFilter(final int limit, final int offset)
62    {
63      Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit);
64      Preconditions.checkArgument(offset >= 0, "offset must be positive %s", offset);
65      this.limit = limit;
66      this.offset = offset;
67    }
68  
69    /**
70     * Initializes filter with a string/bookmark based offset and limit. The offset is arrived
71     * at, by seeking to it using scanner hints. If multiple column families are involved,
72     * pagination starts at the first column family which contains @columnOffset. Columns are
73     * then retrieved sequentially upto @limit number of columns which maybe spread across
74     * multiple column families, depending on how the scan is setup.
75     *
76     * @param limit Max number of columns to return.
77     * @param columnOffset The string/bookmark offset on where to start pagination.
78     */
79    public ColumnPaginationFilter(final int limit, final byte[] columnOffset) {
80      Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit);
81      Preconditions.checkArgument(columnOffset != null,
82                                  "columnOffset must be non-null %s",
83                                  columnOffset);
84      this.limit = limit;
85      this.columnOffset = columnOffset;
86    }
87  
88    /**
89     * @return limit
90     */
91    public int getLimit() {
92      return limit;
93    }
94  
95    /**
96     * @return offset
97     */
98    public int getOffset() {
99      return offset;
100   }
101 
102   /**
103    * @return columnOffset
104    */
105   public byte[] getColumnOffset() {
106     return columnOffset;
107   }
108 
109   @Override
110   public boolean filterRowKey(Cell cell) throws IOException {
111     // Impl in FilterBase might do unnecessary copy for Off heap backed Cells.
112     return false;
113   }
114 
115   @Override
116   public ReturnCode filterKeyValue(Cell v)
117   {
118     if (columnOffset != null) {
119       if (count >= limit) {
120         return ReturnCode.NEXT_ROW;
121       }
122       int cmp = 0;
123       // Only compare if no KV's have been seen so far.
124       if (count == 0) {
125         cmp = CellComparator.compareQualifiers(v, this.columnOffset, 0, this.columnOffset.length);
126       }
127       if (cmp < 0) {
128         return ReturnCode.SEEK_NEXT_USING_HINT;
129       } else {
130         count++;
131         return ReturnCode.INCLUDE_AND_NEXT_COL;
132       }
133     } else {
134       if (count >= offset + limit) {
135         return ReturnCode.NEXT_ROW;
136       }
137 
138       ReturnCode code = count < offset ? ReturnCode.NEXT_COL :
139                                          ReturnCode.INCLUDE_AND_NEXT_COL;
140       count++;
141       return code;
142     }
143   }
144 
145   @Override
146   public Cell getNextCellHint(Cell cell) {
147     return CellUtil.createFirstOnRowCol(cell, columnOffset, 0, columnOffset.length);
148   }
149 
150   @Override
151   public void reset()
152   {
153     this.count = 0;
154   }
155 
156   public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
157     Preconditions.checkArgument(filterArguments.size() == 2,
158                                 "Expected 2 but got: %s", filterArguments.size());
159     int limit = ParseFilter.convertByteArrayToInt(filterArguments.get(0));
160     int offset = ParseFilter.convertByteArrayToInt(filterArguments.get(1));
161     return new ColumnPaginationFilter(limit, offset);
162   }
163 
164   /**
165    * @return The filter serialized using pb
166    */
167   public byte [] toByteArray() {
168     FilterProtos.ColumnPaginationFilter.Builder builder =
169       FilterProtos.ColumnPaginationFilter.newBuilder();
170     builder.setLimit(this.limit);
171     if (this.offset >= 0) {
172       builder.setOffset(this.offset);
173     }
174     if (this.columnOffset != null) {
175       builder.setColumnOffset(ByteStringer.wrap(this.columnOffset));
176     }
177     return builder.build().toByteArray();
178   }
179 
180   /**
181    * @param pbBytes A pb serialized {@link ColumnPaginationFilter} instance
182    * @return An instance of {@link ColumnPaginationFilter} made from <code>bytes</code>
183    * @throws DeserializationException
184    * @see #toByteArray
185    */
186   public static ColumnPaginationFilter parseFrom(final byte [] pbBytes)
187   throws DeserializationException {
188     FilterProtos.ColumnPaginationFilter proto;
189     try {
190       proto = FilterProtos.ColumnPaginationFilter.parseFrom(pbBytes);
191     } catch (InvalidProtocolBufferException e) {
192       throw new DeserializationException(e);
193     }
194     if (proto.hasColumnOffset()) {
195       return new ColumnPaginationFilter(proto.getLimit(),
196                                         proto.getColumnOffset().toByteArray());
197     }
198     return new ColumnPaginationFilter(proto.getLimit(),proto.getOffset());
199   }
200 
201   /**
202    * @param other
203    * @return true if and only if the fields of the filter that are serialized
204    * are equal to the corresponding fields in other.  Used for testing.
205    */
206   boolean areSerializedFieldsEqual(Filter o) {
207     if (o == this) return true;
208     if (!(o instanceof ColumnPaginationFilter)) return false;
209 
210     ColumnPaginationFilter other = (ColumnPaginationFilter)o;
211     if (this.columnOffset != null) {
212       return this.getLimit() == this.getLimit() &&
213           Bytes.equals(this.getColumnOffset(), other.getColumnOffset());
214     }
215     return this.getLimit() == other.getLimit() && this.getOffset() == other.getOffset();
216   }
217 
218   @Override
219   public String toString() {
220     if (this.columnOffset != null) {
221       return (this.getClass().getSimpleName() + "(" + this.limit + ", " +
222           Bytes.toStringBinary(this.columnOffset) + ")");
223     }
224     return String.format("%s (%d, %d)", this.getClass().getSimpleName(),
225         this.limit, this.offset);
226   }
227 }