View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.filter;
20  
21  import java.util.ArrayList;
22  
23  import org.apache.hadoop.classification.InterfaceAudience;
24  import org.apache.hadoop.classification.InterfaceStability;
25  import org.apache.hadoop.hbase.Cell;
26  import org.apache.hadoop.hbase.KeyValueUtil;
27  import org.apache.hadoop.hbase.exceptions.DeserializationException;
28  import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
29  import org.apache.hadoop.hbase.util.ByteStringer;
30  import org.apache.hadoop.hbase.util.Bytes;
31  
32  import com.google.common.base.Preconditions;
33  import com.google.protobuf.InvalidProtocolBufferException;
34  
35  /**
36   * A filter, based on the ColumnCountGetFilter, takes two arguments: limit and offset.
37   * This filter can be used for row-based indexing, where references to other tables are stored across many columns,
38   * in order to efficient lookups and paginated results for end users. Only most recent versions are considered
39   * for pagination.
40   */
41  @InterfaceAudience.Public
42  @InterfaceStability.Stable
43  public class ColumnPaginationFilter extends FilterBase
44  {
45    private int limit = 0;
46    private int offset = -1;
47    private byte[] columnOffset = null;
48    private int count = 0;
49  
50    /**
51     * Initializes filter with an integer offset and limit. The offset is arrived at
52     * scanning sequentially and skipping entries. @limit number of columns are
53     * then retrieved. If multiple column families are involved, the columns may be spread
54     * across them.
55     *
56     * @param limit Max number of columns to return.
57     * @param offset The integer offset where to start pagination.
58     */
59    public ColumnPaginationFilter(final int limit, final int offset)
60    {
61      Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit);
62      Preconditions.checkArgument(offset >= 0, "offset must be positive %s", offset);
63      this.limit = limit;
64      this.offset = offset;
65    }
66  
67    /**
68     * Initializes filter with a string/bookmark based offset and limit. The offset is arrived
69     * at, by seeking to it using scanner hints. If multiple column families are involved,
70     * pagination starts at the first column family which contains @columnOffset. Columns are
71     * then retrieved sequentially upto @limit number of columns which maybe spread across
72     * multiple column families, depending on how the scan is setup.
73     *
74     * @param limit Max number of columns to return.
75     * @param columnOffset The string/bookmark offset on where to start pagination.
76     */
77    public ColumnPaginationFilter(final int limit, final byte[] columnOffset) {
78      Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit);
79      Preconditions.checkArgument(columnOffset != null,
80                                  "columnOffset must be non-null %s",
81                                  columnOffset);
82      this.limit = limit;
83      this.columnOffset = columnOffset;
84    }
85  
86    /**
87     * @return limit
88     */
89    public int getLimit() {
90      return limit;
91    }
92  
93    /**
94     * @return offset
95     */
96    public int getOffset() {
97      return offset;
98    }
99  
100   /**
101    * @return columnOffset
102    */
103   public byte[] getColumnOffset() {
104     return columnOffset;
105   }
106 
107   @Override
108   public ReturnCode filterKeyValue(Cell v)
109   {
110     if (columnOffset != null) {
111       if (count >= limit) {
112         return ReturnCode.NEXT_ROW;
113       }
114       byte[] buffer = v.getQualifierArray();
115       if (buffer == null) {
116         return ReturnCode.SEEK_NEXT_USING_HINT;
117       }
118       int cmp = 0;
119       // Only compare if no KV's have been seen so far.
120       if (count == 0) {
121         cmp = Bytes.compareTo(buffer,
122                               v.getQualifierOffset(),
123                               v.getQualifierLength(),
124                               this.columnOffset,
125                               0,
126                               this.columnOffset.length);
127       }
128       if (cmp < 0) {
129         return ReturnCode.SEEK_NEXT_USING_HINT;
130       } else {
131         count++;
132         return ReturnCode.INCLUDE_AND_NEXT_COL;
133       }
134     } else {
135       if (count >= offset + limit) {
136         return ReturnCode.NEXT_ROW;
137       }
138 
139       ReturnCode code = count < offset ? ReturnCode.NEXT_COL :
140                                          ReturnCode.INCLUDE_AND_NEXT_COL;
141       count++;
142       return code;
143     }
144   }
145 
146   @Override
147   public Cell getNextCellHint(Cell cell) {
148     return KeyValueUtil.createFirstOnRow(
149         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(),
150         cell.getFamilyOffset(), cell.getFamilyLength(), columnOffset, 0, columnOffset.length);
151   }
152 
153   @Override
154   public void reset()
155   {
156     this.count = 0;
157   }
158 
159   public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
160     Preconditions.checkArgument(filterArguments.size() == 2,
161                                 "Expected 2 but got: %s", filterArguments.size());
162     int limit = ParseFilter.convertByteArrayToInt(filterArguments.get(0));
163     int offset = ParseFilter.convertByteArrayToInt(filterArguments.get(1));
164     return new ColumnPaginationFilter(limit, offset);
165   }
166 
167   /**
168    * @return The filter serialized using pb
169    */
170   public byte [] toByteArray() {
171     FilterProtos.ColumnPaginationFilter.Builder builder =
172       FilterProtos.ColumnPaginationFilter.newBuilder();
173     builder.setLimit(this.limit);
174     if (this.offset >= 0) {
175       builder.setOffset(this.offset);
176     }
177     if (this.columnOffset != null) {
178       builder.setColumnOffset(ByteStringer.wrap(this.columnOffset));
179     }
180     return builder.build().toByteArray();
181   }
182 
183   /**
184    * @param pbBytes A pb serialized {@link ColumnPaginationFilter} instance
185    * @return An instance of {@link ColumnPaginationFilter} made from <code>bytes</code>
186    * @throws DeserializationException
187    * @see #toByteArray
188    */
189   public static ColumnPaginationFilter parseFrom(final byte [] pbBytes)
190   throws DeserializationException {
191     FilterProtos.ColumnPaginationFilter proto;
192     try {
193       proto = FilterProtos.ColumnPaginationFilter.parseFrom(pbBytes);
194     } catch (InvalidProtocolBufferException e) {
195       throw new DeserializationException(e);
196     }
197     if (proto.hasColumnOffset()) {
198       return new ColumnPaginationFilter(proto.getLimit(),
199                                         proto.getColumnOffset().toByteArray());
200     }
201     return new ColumnPaginationFilter(proto.getLimit(),proto.getOffset());
202   }
203 
204   /**
205    * @param other
206    * @return true if and only if the fields of the filter that are serialized
207    * are equal to the corresponding fields in other.  Used for testing.
208    */
209   boolean areSerializedFieldsEqual(Filter o) {
210     if (o == this) return true;
211     if (!(o instanceof ColumnPaginationFilter)) return false;
212 
213     ColumnPaginationFilter other = (ColumnPaginationFilter)o;
214     if (this.columnOffset != null) {
215       return this.getLimit() == this.getLimit() &&
216           Bytes.equals(this.getColumnOffset(), other.getColumnOffset());
217     }
218     return this.getLimit() == other.getLimit() && this.getOffset() == other.getOffset();
219   }
220 
221   @Override
222   public String toString() {
223     if (this.columnOffset != null) {
224       return (this.getClass().getSimpleName() + "(" + this.limit + ", " +
225           Bytes.toStringBinary(this.columnOffset) + ")");
226     }
227     return String.format("%s (%d, %d)", this.getClass().getSimpleName(),
228         this.limit, this.offset);
229   }
230 }