001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.querymatcher;
019
020import java.io.IOException;
021import java.util.NavigableSet;
022import org.apache.hadoop.hbase.Cell;
023import org.apache.hadoop.hbase.CellUtil;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.PrivateCellUtil;
026import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
027import org.apache.yetus.audience.InterfaceAudience;
028
029/**
030 * This class is used for the tracking and enforcement of columns and numbers of versions during the
031 * course of a Get or Scan operation, when explicit column qualifiers have been asked for in the
032 * query. With a little magic (see {@link ScanQueryMatcher}), we can use this matcher for both scans
033 * and gets. The main difference is 'next' and 'done' collapse for the scan case (since we see all
034 * columns in order), and we only reset between rows.
035 * <p>
036 * This class is utilized by {@link ScanQueryMatcher} mainly through two methods:
037 * <ul>
038 * <li>{@link #checkColumn} is called when a Put satisfies all other conditions of the query.</li>
039 * <li>{@link #getNextRowOrNextColumn} is called whenever ScanQueryMatcher believes that the current
040 * column should be skipped (by timestamp, filter etc.)</li>
041 * </ul>
042 * <p>
043 * These two methods returns a
044 * {@link org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode} to define
045 * what action should be taken.
046 * <p>
047 * This class is NOT thread-safe as queries are never multi-threaded
048 */
049@InterfaceAudience.Private
050public class ExplicitColumnTracker implements ColumnTracker {
051
052  private final int maxVersions;
053  private final int minVersions;
054
055  /**
056   * Contains the list of columns that the ExplicitColumnTracker is tracking. Each ColumnCount
057   * instance also tracks how many versions of the requested column have been returned.
058   */
059  private final ColumnCount[] columns;
060  private int index;
061  private ColumnCount column;
062  /**
063   * Keeps track of the latest timestamp included for current column. Used to eliminate duplicates.
064   */
065  private long latestTSOfCurrentColumn;
066  private long oldestStamp;
067
068  /**
069   * Default constructor.
070   * @param columns           columns specified user in query
071   * @param minVersions       minimum number of versions to keep
072   * @param maxVersions       maximum versions to return per column
073   * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
074   */
075  public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions, int maxVersions,
076    long oldestUnexpiredTS) {
077    this.maxVersions = maxVersions;
078    this.minVersions = minVersions;
079    this.oldestStamp = oldestUnexpiredTS;
080    this.columns = new ColumnCount[columns.size()];
081    int i = 0;
082    for (byte[] column : columns) {
083      this.columns[i++] = new ColumnCount(column);
084    }
085    reset();
086  }
087
088  /**
089   * Done when there are no more columns to match against.
090   */
091  @Override
092  public boolean done() {
093    return this.index >= columns.length;
094  }
095
096  @Override
097  public ColumnCount getColumnHint() {
098    return this.column;
099  }
100
101  /**
102   * {@inheritDoc}
103   */
104  @Override
105  public ScanQueryMatcher.MatchCode checkColumn(Cell cell, byte type) {
106    // delete markers should never be passed to an
107    // *Explicit*ColumnTracker
108    assert !PrivateCellUtil.isDelete(type);
109    do {
110      // No more columns left, we are done with this query
111      if (done()) {
112        return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
113      }
114
115      // No more columns to match against, done with storefile
116      if (this.column == null) {
117        return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
118      }
119
120      // Compare specific column to current column
121      int ret = CellUtil.compareQualifiers(cell, column.getBuffer(), column.getOffset(),
122        column.getLength());
123
124      // Column Matches. Return include code. The caller would call checkVersions
125      // to limit the number of versions.
126      if (ret == 0) {
127        return ScanQueryMatcher.MatchCode.INCLUDE;
128      }
129
130      resetTS();
131
132      if (ret < 0) {
133        // The current KV is smaller than the column the ExplicitColumnTracker
134        // is interested in, so seek to that column of interest.
135        return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
136      }
137
138      // The current KV is bigger than the column the ExplicitColumnTracker
139      // is interested in. That means there is no more data for the column
140      // of interest. Advance the ExplicitColumnTracker state to next
141      // column of interest, and check again.
142      if (ret > 0) {
143        ++this.index;
144        if (done()) {
145          // No more to match, do not include, done with this row.
146          return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
147        }
148        // This is the recursive case.
149        this.column = this.columns[this.index];
150      }
151    } while (true);
152  }
153
154  @Override
155  public ScanQueryMatcher.MatchCode checkVersions(Cell cell, long timestamp, byte type,
156    boolean ignoreCount) throws IOException {
157    assert !PrivateCellUtil.isDelete(type);
158    if (ignoreCount) {
159      return ScanQueryMatcher.MatchCode.INCLUDE;
160    }
161    // Check if it is a duplicate timestamp
162    if (sameAsPreviousTS(timestamp)) {
163      // If duplicate, skip this Key
164      return ScanQueryMatcher.MatchCode.SKIP;
165    }
166    int count = this.column.increment();
167    if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) {
168      // Done with versions for this column
169      ++this.index;
170      resetTS();
171      if (done()) {
172        // We have served all the requested columns.
173        this.column = null;
174        return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
175      }
176      // We are done with current column; advance to next column
177      // of interest.
178      this.column = this.columns[this.index];
179      return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
180    }
181    setTS(timestamp);
182    return ScanQueryMatcher.MatchCode.INCLUDE;
183  }
184
185  // Called between every row.
186  @Override
187  public void reset() {
188    this.index = 0;
189    this.column = this.columns[this.index];
190    for (ColumnCount col : this.columns) {
191      col.setCount(0);
192    }
193    resetTS();
194  }
195
196  private void resetTS() {
197    latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP;
198  }
199
200  private void setTS(long timestamp) {
201    latestTSOfCurrentColumn = timestamp;
202  }
203
204  private boolean sameAsPreviousTS(long timestamp) {
205    return timestamp == latestTSOfCurrentColumn;
206  }
207
208  private boolean isExpired(long timestamp) {
209    return timestamp < oldestStamp;
210  }
211
212  @Override
213  public void doneWithColumn(Cell cell) {
214    while (this.column != null) {
215      int compare = CellUtil.compareQualifiers(cell, column.getBuffer(), column.getOffset(),
216        column.getLength());
217      resetTS();
218      if (compare >= 0) {
219        ++this.index;
220        if (done()) {
221          // Will not hit any more columns in this storefile
222          this.column = null;
223        } else {
224          this.column = this.columns[this.index];
225        }
226        if (compare > 0) {
227          continue;
228        }
229      }
230      return;
231    }
232  }
233
234  @Override
235  public MatchCode getNextRowOrNextColumn(Cell cell) {
236    doneWithColumn(cell);
237
238    if (getColumnHint() == null) {
239      return MatchCode.SEEK_NEXT_ROW;
240    } else {
241      return MatchCode.SEEK_NEXT_COL;
242    }
243  }
244
245  @Override
246  public boolean isDone(long timestamp) {
247    return minVersions <= 0 && isExpired(timestamp);
248  }
249
250  @Override
251  public void beforeShipped() throws IOException {
252    // do nothing
253  }
254}