View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.filter;
20  
21  import java.util.ArrayList;
22  
23  import org.apache.hadoop.hbase.util.ByteStringer;
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.classification.InterfaceStability;
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.KeyValueUtil;
28  import org.apache.hadoop.hbase.exceptions.DeserializationException;
29  import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
30  import org.apache.hadoop.hbase.util.Bytes;
31  
32  import com.google.common.base.Preconditions;
33  import com.google.protobuf.InvalidProtocolBufferException;
34  
35  /**
36   * A filter, based on the ColumnCountGetFilter, takes two arguments: limit and offset.
37   * This filter can be used for row-based indexing, where references to other tables are stored across many columns,
38   * in order to efficient lookups and paginated results for end users. Only most recent versions are considered
39   * for pagination.
40   */
41  @InterfaceAudience.Public
42  @InterfaceStability.Stable
43  public class ColumnPaginationFilter extends FilterBase
44  {
45    private int limit = 0;
46    private int offset = -1;
47    private byte[] columnOffset = null;
48    private int count = 0;
49  
50    /**
51     * Initializes filter with an integer offset and limit. The offset is arrived at
52     * scanning sequentially and skipping entries. @limit number of columns are
53     * then retrieved. If multiple column families are involved, the columns may be spread
54     * across them.
55     *
56     * @param limit Max number of columns to return.
57     * @param offset The integer offset where to start pagination.
58     */
59    public ColumnPaginationFilter(final int limit, final int offset)
60    {
61      Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit);
62      Preconditions.checkArgument(offset >= 0, "offset must be positive %s", offset);
63      this.limit = limit;
64      this.offset = offset;
65    }
66  
67    /**
68     * Initializes filter with a string/bookmark based offset and limit. The offset is arrived
69     * at, by seeking to it using scanner hints. If multiple column families are involved,
70     * pagination starts at the first column family which contains @columnOffset. Columns are
71     * then retrieved sequentially upto @limit number of columns which maybe spread across
72     * multiple column families, depending on how the scan is setup.
73     *
74     * @param limit Max number of columns to return.
75     * @param columnOffset The string/bookmark offset on where to start pagination.
76     */
77    public ColumnPaginationFilter(final int limit, final byte[] columnOffset) {
78      Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit);
79      Preconditions.checkArgument(columnOffset != null,
80                                  "columnOffset must be non-null %s",
81                                  columnOffset);
82      this.limit = limit;
83      this.columnOffset = columnOffset;
84    }
85  
86    /**
87     * @return limit
88     */
89    public int getLimit() {
90      return limit;
91    }
92  
93    /**
94     * @return offset
95     */
96    public int getOffset() {
97      return offset;
98    }
99  
100   /**
101    * @return columnOffset
102    */
103   public byte[] getColumnOffset() {
104     return columnOffset;
105   }
106 
107   @Override
108   public ReturnCode filterKeyValue(Cell v)
109   {
110     if (columnOffset != null) {
111       if (count >= limit) {
112         return ReturnCode.NEXT_ROW;
113       }
114       byte[] buffer = v.getQualifierArray();
115       if (buffer == null) {
116         return ReturnCode.SEEK_NEXT_USING_HINT;
117       }
118       int cmp = 0;
119       // Only compare if no KV's have been seen so far.
120       if (count == 0) {
121         cmp = Bytes.compareTo(buffer,
122                               v.getQualifierOffset(),
123                               v.getQualifierLength(),
124                               this.columnOffset,
125                               0,
126                               this.columnOffset.length);
127       }
128       if (cmp < 0) {
129         return ReturnCode.SEEK_NEXT_USING_HINT;
130       } else {
131         count++;
132         return ReturnCode.INCLUDE_AND_NEXT_COL;
133       }
134     } else {
135       if (count >= offset + limit) {
136         return ReturnCode.NEXT_ROW;
137       }
138 
139       ReturnCode code = count < offset ? ReturnCode.NEXT_COL :
140                                          ReturnCode.INCLUDE_AND_NEXT_COL;
141       count++;
142       return code;
143     }
144   }
145 
146   // Override here explicitly as the method in super class FilterBase might do a KeyValue recreate.
147   // See HBASE-12068
148   @Override
149   public Cell transformCell(Cell v) {
150     return v;
151   }
152 
153   @Override
154   public Cell getNextCellHint(Cell kv) {
155     return KeyValueUtil.createFirstOnRow(
156         kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), kv.getFamilyArray(),
157         kv.getFamilyOffset(), kv.getFamilyLength(), columnOffset, 0, columnOffset.length);
158   }
159 
160   @Override
161   public void reset()
162   {
163     this.count = 0;
164   }
165 
166   public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
167     Preconditions.checkArgument(filterArguments.size() == 2,
168                                 "Expected 2 but got: %s", filterArguments.size());
169     int limit = ParseFilter.convertByteArrayToInt(filterArguments.get(0));
170     int offset = ParseFilter.convertByteArrayToInt(filterArguments.get(1));
171     return new ColumnPaginationFilter(limit, offset);
172   }
173 
174   /**
175    * @return The filter serialized using pb
176    */
177   public byte [] toByteArray() {
178     FilterProtos.ColumnPaginationFilter.Builder builder =
179       FilterProtos.ColumnPaginationFilter.newBuilder();
180     builder.setLimit(this.limit);
181     if (this.offset >= 0) {
182       builder.setOffset(this.offset);
183     }
184     if (this.columnOffset != null) {
185       builder.setColumnOffset(ByteStringer.wrap(this.columnOffset));
186     }
187     return builder.build().toByteArray();
188   }
189 
190   /**
191    * @param pbBytes A pb serialized {@link ColumnPaginationFilter} instance
192    * @return An instance of {@link ColumnPaginationFilter} made from <code>bytes</code>
193    * @throws DeserializationException
194    * @see #toByteArray
195    */
196   public static ColumnPaginationFilter parseFrom(final byte [] pbBytes)
197   throws DeserializationException {
198     FilterProtos.ColumnPaginationFilter proto;
199     try {
200       proto = FilterProtos.ColumnPaginationFilter.parseFrom(pbBytes);
201     } catch (InvalidProtocolBufferException e) {
202       throw new DeserializationException(e);
203     }
204     if (proto.hasColumnOffset()) {
205       return new ColumnPaginationFilter(proto.getLimit(),
206                                         proto.getColumnOffset().toByteArray());
207     }
208     return new ColumnPaginationFilter(proto.getLimit(),proto.getOffset());
209   }
210 
211   /**
212    * @param other
213    * @return true if and only if the fields of the filter that are serialized
214    * are equal to the corresponding fields in other.  Used for testing.
215    */
216   boolean areSerializedFieldsEqual(Filter o) {
217     if (o == this) return true;
218     if (!(o instanceof ColumnPaginationFilter)) return false;
219 
220     ColumnPaginationFilter other = (ColumnPaginationFilter)o;
221     if (this.columnOffset != null) {
222       return this.getLimit() == this.getLimit() &&
223           Bytes.equals(this.getColumnOffset(), other.getColumnOffset());
224     }
225     return this.getLimit() == other.getLimit() && this.getOffset() == other.getOffset();
226   }
227 
228   @Override
229   public String toString() {
230     if (this.columnOffset != null) {
231       return (this.getClass().getSimpleName() + "(" + this.limit + ", " +
232           Bytes.toStringBinary(this.columnOffset) + ")");
233     }
234     return String.format("%s (%d, %d)", this.getClass().getSimpleName(),
235         this.limit, this.offset);
236   }
237 }