001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.filter;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Objects;
024
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.PrivateCellUtil;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.hadoop.hbase.exceptions.DeserializationException;
030import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos;
031import org.apache.hadoop.hbase.util.Bytes;
032
033import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
034import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
035import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
036
037/**
038 * A filter, based on the ColumnCountGetFilter, takes two arguments: limit and offset.
039 * This filter can be used for row-based indexing, where references to other tables are stored across many columns,
040 * in order to efficient lookups and paginated results for end users. Only most recent versions are considered
041 * for pagination.
042 */
043@InterfaceAudience.Public
044public class ColumnPaginationFilter extends FilterBase {
045
046  private int limit = 0;
047  private int offset = -1;
048  private byte[] columnOffset = null;
049  private int count = 0;
050
051  /**
052   * Initializes filter with an integer offset and limit. The offset is arrived at
053   * scanning sequentially and skipping entries. @limit number of columns are
054   * then retrieved. If multiple column families are involved, the columns may be spread
055   * across them.
056   *
057   * @param limit Max number of columns to return.
058   * @param offset The integer offset where to start pagination.
059   */
060  public ColumnPaginationFilter(final int limit, final int offset)
061  {
062    Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit);
063    Preconditions.checkArgument(offset >= 0, "offset must be positive %s", offset);
064    this.limit = limit;
065    this.offset = offset;
066  }
067
068  /**
069   * Initializes filter with a string/bookmark based offset and limit. The offset is arrived
070   * at, by seeking to it using scanner hints. If multiple column families are involved,
071   * pagination starts at the first column family which contains @columnOffset. Columns are
072   * then retrieved sequentially upto @limit number of columns which maybe spread across
073   * multiple column families, depending on how the scan is setup.
074   *
075   * @param limit Max number of columns to return.
076   * @param columnOffset The string/bookmark offset on where to start pagination.
077   */
078  public ColumnPaginationFilter(final int limit, final byte[] columnOffset) {
079    Preconditions.checkArgument(limit >= 0, "limit must be positive %s", limit);
080    Preconditions.checkArgument(columnOffset != null,
081                                "columnOffset must be non-null %s",
082                                columnOffset);
083    this.limit = limit;
084    this.columnOffset = columnOffset;
085  }
086
087  /**
088   * @return limit
089   */
090  public int getLimit() {
091    return limit;
092  }
093
094  /**
095   * @return offset
096   */
097  public int getOffset() {
098    return offset;
099  }
100
101  /**
102   * @return columnOffset
103   */
104  public byte[] getColumnOffset() {
105    return columnOffset;
106  }
107
108  @Override
109  public boolean filterRowKey(Cell cell) throws IOException {
110    // Impl in FilterBase might do unnecessary copy for Off heap backed Cells.
111    return false;
112  }
113
114  @Override
115  @Deprecated
116  public ReturnCode filterKeyValue(final Cell c) {
117    return filterCell(c);
118  }
119
120  @Override
121  public ReturnCode filterCell(final Cell c)
122  {
123    if (columnOffset != null) {
124      if (count >= limit) {
125        return ReturnCode.NEXT_ROW;
126      }
127      int cmp = 0;
128      // Only compare if no KV's have been seen so far.
129      if (count == 0) {
130        cmp = CellUtil.compareQualifiers(c, this.columnOffset, 0, this.columnOffset.length);
131      }
132      if (cmp < 0) {
133        return ReturnCode.SEEK_NEXT_USING_HINT;
134      } else {
135        count++;
136        return ReturnCode.INCLUDE_AND_NEXT_COL;
137      }
138    } else {
139      if (count >= offset + limit) {
140        return ReturnCode.NEXT_ROW;
141      }
142
143      ReturnCode code = count < offset ? ReturnCode.NEXT_COL :
144                                         ReturnCode.INCLUDE_AND_NEXT_COL;
145      count++;
146      return code;
147    }
148  }
149
150  @Override
151  public Cell getNextCellHint(Cell cell) {
152    return PrivateCellUtil.createFirstOnRowCol(cell, columnOffset, 0, columnOffset.length);
153  }
154
155  @Override
156  public void reset()
157  {
158    this.count = 0;
159  }
160
161  public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
162    Preconditions.checkArgument(filterArguments.size() == 2,
163                                "Expected 2 but got: %s", filterArguments.size());
164    int limit = ParseFilter.convertByteArrayToInt(filterArguments.get(0));
165    int offset = ParseFilter.convertByteArrayToInt(filterArguments.get(1));
166    return new ColumnPaginationFilter(limit, offset);
167  }
168
169  /**
170   * @return The filter serialized using pb
171   */
172  @Override
173  public byte [] toByteArray() {
174    FilterProtos.ColumnPaginationFilter.Builder builder =
175      FilterProtos.ColumnPaginationFilter.newBuilder();
176    builder.setLimit(this.limit);
177    if (this.offset >= 0) {
178      builder.setOffset(this.offset);
179    }
180    if (this.columnOffset != null) {
181      builder.setColumnOffset(UnsafeByteOperations.unsafeWrap(this.columnOffset));
182    }
183    return builder.build().toByteArray();
184  }
185
186  /**
187   * @param pbBytes A pb serialized {@link ColumnPaginationFilter} instance
188   * @return An instance of {@link ColumnPaginationFilter} made from <code>bytes</code>
189   * @throws DeserializationException
190   * @see #toByteArray
191   */
192  public static ColumnPaginationFilter parseFrom(final byte [] pbBytes)
193  throws DeserializationException {
194    FilterProtos.ColumnPaginationFilter proto;
195    try {
196      proto = FilterProtos.ColumnPaginationFilter.parseFrom(pbBytes);
197    } catch (InvalidProtocolBufferException e) {
198      throw new DeserializationException(e);
199    }
200    if (proto.hasColumnOffset()) {
201      return new ColumnPaginationFilter(proto.getLimit(),
202                                        proto.getColumnOffset().toByteArray());
203    }
204    return new ColumnPaginationFilter(proto.getLimit(),proto.getOffset());
205  }
206
207  /**
208   * @param o the other filter to compare with
209   * @return true if and only if the fields of the filter that are serialized
210   * are equal to the corresponding fields in other.  Used for testing.
211   */
212  @Override
213  boolean areSerializedFieldsEqual(Filter o) {
214    if (o == this) return true;
215    if (!(o instanceof ColumnPaginationFilter)) return false;
216
217    ColumnPaginationFilter other = (ColumnPaginationFilter)o;
218    if (this.columnOffset != null) {
219      return this.getLimit() == other.getLimit() &&
220          Bytes.equals(this.getColumnOffset(), other.getColumnOffset());
221    }
222    return this.getLimit() == other.getLimit() && this.getOffset() == other.getOffset();
223  }
224
225  @Override
226  public String toString() {
227    if (this.columnOffset != null) {
228      return (this.getClass().getSimpleName() + "(" + this.limit + ", " +
229          Bytes.toStringBinary(this.columnOffset) + ")");
230    }
231    return String.format("%s (%d, %d)", this.getClass().getSimpleName(),
232        this.limit, this.offset);
233  }
234
235  @Override
236  public boolean equals(Object obj) {
237    return obj instanceof Filter && areSerializedFieldsEqual((Filter) obj);
238  }
239
240  @Override
241  public int hashCode() {
242    return columnOffset == null ? Objects.hash(this.limit, this.offset) :
243      Objects.hash(this.limit, Bytes.hashCode(this.columnOffset));
244  }
245}