View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.filter;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  
24  import org.apache.hadoop.hbase.Cell;
25  import org.apache.hadoop.hbase.KeyValueUtil;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.hbase.exceptions.DeserializationException;
29  import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
30  import org.apache.hadoop.hbase.util.ByteStringer;
31  import org.apache.hadoop.hbase.util.Bytes;
32  
33  import com.google.common.base.Preconditions;
34  import com.google.protobuf.InvalidProtocolBufferException;
35  
36  /**
37   * A filter, based on the ColumnCountGetFilter, takes two arguments: limit and offset.
38   * This filter can be used for row-based indexing, where references to other tables are stored across many columns,
39   * in order to efficient lookups and paginated results for end users. Only most recent versions are considered
40   * for pagination.
41   */
42  @InterfaceAudience.Public
43  @InterfaceStability.Stable
44  public class ColumnPaginationFilter extends FilterBase {
45  
46    private int limit = 0;
47    private int offset = -1;
48    private byte[] columnOffset = null;
49    private int count = 0;
50  
51    /**
52     * Initializes filter with an integer offset and limit. The offset is arrived at
53     * scanning sequentially and skipping entries. @limit number of columns are
54     * then retrieved. If multiple column families are involved, the columns may be spread
55     * across them.
56     *
57     * @param limit Max number of columns to return.
58     * @param offset The integer offset where to start pagination.
59     */
60    public ColumnPaginationFilter(final int limit, final int offset)
61    {
62      Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit);
63      Preconditions.checkArgument(offset >= 0, "offset must be positive %s", offset);
64      this.limit = limit;
65      this.offset = offset;
66    }
67  
68    /**
69     * Initializes filter with a string/bookmark based offset and limit. The offset is arrived
70     * at, by seeking to it using scanner hints. If multiple column families are involved,
71     * pagination starts at the first column family which contains @columnOffset. Columns are
72     * then retrieved sequentially upto @limit number of columns which maybe spread across
73     * multiple column families, depending on how the scan is setup.
74     *
75     * @param limit Max number of columns to return.
76     * @param columnOffset The string/bookmark offset on where to start pagination.
77     */
78    public ColumnPaginationFilter(final int limit, final byte[] columnOffset) {
79      Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit);
80      Preconditions.checkArgument(columnOffset != null,
81                                  "columnOffset must be non-null %s",
82                                  columnOffset);
83      this.limit = limit;
84      this.columnOffset = columnOffset;
85    }
86  
87    /**
88     * @return limit
89     */
90    public int getLimit() {
91      return limit;
92    }
93  
94    /**
95     * @return offset
96     */
97    public int getOffset() {
98      return offset;
99    }
100 
101   /**
102    * @return columnOffset
103    */
104   public byte[] getColumnOffset() {
105     return columnOffset;
106   }
107 
108   @Override
109   public boolean filterRowKey(Cell cell) throws IOException {
110     // Impl in FilterBase might do unnecessary copy for Off heap backed Cells.
111     return false;
112   }
113 
114   @Override
115   public ReturnCode filterKeyValue(Cell v)
116   {
117     if (columnOffset != null) {
118       if (count >= limit) {
119         return ReturnCode.NEXT_ROW;
120       }
121       byte[] buffer = v.getQualifierArray();
122       if (buffer == null) {
123         return ReturnCode.SEEK_NEXT_USING_HINT;
124       }
125       int cmp = 0;
126       // Only compare if no KV's have been seen so far.
127       if (count == 0) {
128         cmp = Bytes.compareTo(buffer,
129                               v.getQualifierOffset(),
130                               v.getQualifierLength(),
131                               this.columnOffset,
132                               0,
133                               this.columnOffset.length);
134       }
135       if (cmp < 0) {
136         return ReturnCode.SEEK_NEXT_USING_HINT;
137       } else {
138         count++;
139         return ReturnCode.INCLUDE_AND_NEXT_COL;
140       }
141     } else {
142       if (count >= offset + limit) {
143         return ReturnCode.NEXT_ROW;
144       }
145 
146       ReturnCode code = count < offset ? ReturnCode.NEXT_COL :
147                                          ReturnCode.INCLUDE_AND_NEXT_COL;
148       count++;
149       return code;
150     }
151   }
152 
153   @Override
154   public Cell getNextCellHint(Cell cell) {
155     return KeyValueUtil.createFirstOnRow(
156         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getFamilyArray(),
157         cell.getFamilyOffset(), cell.getFamilyLength(), columnOffset, 0, columnOffset.length);
158   }
159 
160   @Override
161   public void reset()
162   {
163     this.count = 0;
164   }
165 
166   public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
167     Preconditions.checkArgument(filterArguments.size() == 2,
168                                 "Expected 2 but got: %s", filterArguments.size());
169     int limit = ParseFilter.convertByteArrayToInt(filterArguments.get(0));
170     int offset = ParseFilter.convertByteArrayToInt(filterArguments.get(1));
171     return new ColumnPaginationFilter(limit, offset);
172   }
173 
174   /**
175    * @return The filter serialized using pb
176    */
177   public byte [] toByteArray() {
178     FilterProtos.ColumnPaginationFilter.Builder builder =
179       FilterProtos.ColumnPaginationFilter.newBuilder();
180     builder.setLimit(this.limit);
181     if (this.offset >= 0) {
182       builder.setOffset(this.offset);
183     }
184     if (this.columnOffset != null) {
185       builder.setColumnOffset(ByteStringer.wrap(this.columnOffset));
186     }
187     return builder.build().toByteArray();
188   }
189 
190   /**
191    * @param pbBytes A pb serialized {@link ColumnPaginationFilter} instance
192    * @return An instance of {@link ColumnPaginationFilter} made from <code>bytes</code>
193    * @throws DeserializationException
194    * @see #toByteArray
195    */
196   public static ColumnPaginationFilter parseFrom(final byte [] pbBytes)
197   throws DeserializationException {
198     FilterProtos.ColumnPaginationFilter proto;
199     try {
200       proto = FilterProtos.ColumnPaginationFilter.parseFrom(pbBytes);
201     } catch (InvalidProtocolBufferException e) {
202       throw new DeserializationException(e);
203     }
204     if (proto.hasColumnOffset()) {
205       return new ColumnPaginationFilter(proto.getLimit(),
206                                         proto.getColumnOffset().toByteArray());
207     }
208     return new ColumnPaginationFilter(proto.getLimit(),proto.getOffset());
209   }
210 
211   /**
212    * @param other
213    * @return true if and only if the fields of the filter that are serialized
214    * are equal to the corresponding fields in other.  Used for testing.
215    */
216   boolean areSerializedFieldsEqual(Filter o) {
217     if (o == this) return true;
218     if (!(o instanceof ColumnPaginationFilter)) return false;
219 
220     ColumnPaginationFilter other = (ColumnPaginationFilter)o;
221     if (this.columnOffset != null) {
222       return this.getLimit() == this.getLimit() &&
223           Bytes.equals(this.getColumnOffset(), other.getColumnOffset());
224     }
225     return this.getLimit() == other.getLimit() && this.getOffset() == other.getOffset();
226   }
227 
228   @Override
229   public String toString() {
230     if (this.columnOffset != null) {
231       return (this.getClass().getSimpleName() + "(" + this.limit + ", " +
232           Bytes.toStringBinary(this.columnOffset) + ")");
233     }
234     return String.format("%s (%d, %d)", this.getClass().getSimpleName(),
235         this.limit, this.offset);
236   }
237 }