001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.querymatcher;
019
020import java.io.IOException;
021import java.util.NavigableSet;
022
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.CellUtil;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.PrivateCellUtil;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
029
030/**
031 * This class is used for the tracking and enforcement of columns and numbers of versions during the
032 * course of a Get or Scan operation, when explicit column qualifiers have been asked for in the
033 * query. With a little magic (see {@link ScanQueryMatcher}), we can use this matcher for both scans
034 * and gets. The main difference is 'next' and 'done' collapse for the scan case (since we see all
035 * columns in order), and we only reset between rows.
036 * <p>
037 * This class is utilized by {@link ScanQueryMatcher} mainly through two methods:
038 * <ul>
039 * <li>{@link #checkColumn} is called when a Put satisfies all other conditions of the query.</li>
040 * <li>{@link #getNextRowOrNextColumn} is called whenever ScanQueryMatcher believes that the current
041 * column should be skipped (by timestamp, filter etc.)</li>
042 * </ul>
043 * <p>
044 * These two methods returns a
045 * {@link org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode} to define
046 * what action should be taken.
047 * <p>
048 * This class is NOT thread-safe as queries are never multi-threaded
049 */
050@InterfaceAudience.Private
051public class ExplicitColumnTracker implements ColumnTracker {
052
053  private final int maxVersions;
054  private final int minVersions;
055
056  /**
057   * Contains the list of columns that the ExplicitColumnTracker is tracking. Each ColumnCount
058   * instance also tracks how many versions of the requested column have been returned.
059   */
060  private final ColumnCount[] columns;
061  private int index;
062  private ColumnCount column;
063  /**
064   * Keeps track of the latest timestamp included for current column. Used to eliminate duplicates.
065   */
066  private long latestTSOfCurrentColumn;
067  private long oldestStamp;
068
069  /**
070   * Default constructor.
071   * @param columns columns specified user in query
072   * @param minVersions minimum number of versions to keep
073   * @param maxVersions maximum versions to return per column
074   * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
075   */
076  public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions, int maxVersions,
077      long oldestUnexpiredTS) {
078    this.maxVersions = maxVersions;
079    this.minVersions = minVersions;
080    this.oldestStamp = oldestUnexpiredTS;
081    this.columns = new ColumnCount[columns.size()];
082    int i = 0;
083    for (byte[] column : columns) {
084      this.columns[i++] = new ColumnCount(column);
085    }
086    reset();
087  }
088
089  /**
090   * Done when there are no more columns to match against.
091   */
092  @Override
093  public boolean done() {
094    return this.index >= columns.length;
095  }
096
097  @Override
098  public ColumnCount getColumnHint() {
099    return this.column;
100  }
101
102  /**
103   * {@inheritDoc}
104   */
105  @Override
106  public ScanQueryMatcher.MatchCode checkColumn(Cell cell, byte type) {
107    // delete markers should never be passed to an
108    // *Explicit*ColumnTracker
109    assert !PrivateCellUtil.isDelete(type);
110    do {
111      // No more columns left, we are done with this query
112      if (done()) {
113        return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
114      }
115
116      // No more columns to match against, done with storefile
117      if (this.column == null) {
118        return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
119      }
120
121      // Compare specific column to current column
122      int ret = CellUtil.compareQualifiers(cell, column.getBuffer(), column.getOffset(),
123        column.getLength());
124
125      // Column Matches. Return include code. The caller would call checkVersions
126      // to limit the number of versions.
127      if (ret == 0) {
128        return ScanQueryMatcher.MatchCode.INCLUDE;
129      }
130
131      resetTS();
132
133      if (ret < 0) {
134        // The current KV is smaller than the column the ExplicitColumnTracker
135        // is interested in, so seek to that column of interest.
136        return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
137      }
138
139      // The current KV is bigger than the column the ExplicitColumnTracker
140      // is interested in. That means there is no more data for the column
141      // of interest. Advance the ExplicitColumnTracker state to next
142      // column of interest, and check again.
143      if (ret > 0) {
144        ++this.index;
145        if (done()) {
146          // No more to match, do not include, done with this row.
147          return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
148        }
149        // This is the recursive case.
150        this.column = this.columns[this.index];
151      }
152    } while (true);
153  }
154
155  @Override
156  public ScanQueryMatcher.MatchCode checkVersions(Cell cell, long timestamp, byte type,
157      boolean ignoreCount) throws IOException {
158    assert !PrivateCellUtil.isDelete(type);
159    if (ignoreCount) {
160      return ScanQueryMatcher.MatchCode.INCLUDE;
161    }
162    // Check if it is a duplicate timestamp
163    if (sameAsPreviousTS(timestamp)) {
164      // If duplicate, skip this Key
165      return ScanQueryMatcher.MatchCode.SKIP;
166    }
167    int count = this.column.increment();
168    if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) {
169      // Done with versions for this column
170      ++this.index;
171      resetTS();
172      if (done()) {
173        // We have served all the requested columns.
174        this.column = null;
175        return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
176      }
177      // We are done with current column; advance to next column
178      // of interest.
179      this.column = this.columns[this.index];
180      return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
181    }
182    setTS(timestamp);
183    return ScanQueryMatcher.MatchCode.INCLUDE;
184  }
185
186  // Called between every row.
187  @Override
188  public void reset() {
189    this.index = 0;
190    this.column = this.columns[this.index];
191    for (ColumnCount col : this.columns) {
192      col.setCount(0);
193    }
194    resetTS();
195  }
196
197  private void resetTS() {
198    latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP;
199  }
200
201  private void setTS(long timestamp) {
202    latestTSOfCurrentColumn = timestamp;
203  }
204
205  private boolean sameAsPreviousTS(long timestamp) {
206    return timestamp == latestTSOfCurrentColumn;
207  }
208
209  private boolean isExpired(long timestamp) {
210    return timestamp < oldestStamp;
211  }
212
213  @Override
214  public void doneWithColumn(Cell cell) {
215    while (this.column != null) {
216      int compare = CellUtil.compareQualifiers(cell, column.getBuffer(), column.getOffset(),
217        column.getLength());
218      resetTS();
219      if (compare >= 0) {
220        ++this.index;
221        if (done()) {
222          // Will not hit any more columns in this storefile
223          this.column = null;
224        } else {
225          this.column = this.columns[this.index];
226        }
227        if (compare > 0) {
228          continue;
229        }
230      }
231      return;
232    }
233  }
234
235  @Override
236  public MatchCode getNextRowOrNextColumn(Cell cell) {
237    doneWithColumn(cell);
238
239    if (getColumnHint() == null) {
240      return MatchCode.SEEK_NEXT_ROW;
241    } else {
242      return MatchCode.SEEK_NEXT_COL;
243    }
244  }
245
246  @Override
247  public boolean isDone(long timestamp) {
248    return minVersions <= 0 && isExpired(timestamp);
249  }
250
251  @Override
252  public void beforeShipped() throws IOException {
253    // do nothing
254  }
255}