1 /*
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.NavigableSet;
23
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.CellUtil;
26 import org.apache.hadoop.hbase.HConstants;
27 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
28 import org.apache.hadoop.hbase.util.Bytes;
29
30 /**
31 * This class is used for the tracking and enforcement of columns and numbers
32 * of versions during the course of a Get or Scan operation, when explicit
33 * column qualifiers have been asked for in the query.
34 *
35 * With a little magic (see {@link ScanQueryMatcher}), we can use this matcher
36 * for both scans and gets. The main difference is 'next' and 'done' collapse
37 * for the scan case (since we see all columns in order), and we only reset
38 * between rows.
39 *
40 * <p>
41 * This class is utilized by {@link ScanQueryMatcher} mainly through two methods:
42 * <ul><li>{@link #checkColumn} is called when a Put satisfies all other
43 * conditions of the query.</li>
44 * <li>{@link #getNextRowOrNextColumn} is called whenever ScanQueryMatcher
45 * believes that the current column should be skipped (by timestamp, filter etc.)</li>
46 * </ul>
47 * <p>
48 * These two methods returns a
49 * {@link org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode}
50 * to define what action should be taken.
51 * <p>
52 * This class is NOT thread-safe as queries are never multi-threaded
53 */
54 @InterfaceAudience.Private
55 public class ExplicitColumnTracker implements ColumnTracker {
56
57 private final int maxVersions;
58 private final int minVersions;
59
60 /**
61 * Contains the list of columns that the ExplicitColumnTracker is tracking.
62 * Each ColumnCount instance also tracks how many versions of the requested
63 * column have been returned.
64 */
65 private final ColumnCount[] columns;
66 private int index;
67 private ColumnCount column;
68 /** Keeps track of the latest timestamp included for current column.
69 * Used to eliminate duplicates. */
70 private long latestTSOfCurrentColumn;
71 private long oldestStamp;
72
73 /**
74 * Default constructor.
75 * @param columns columns specified user in query
76 * @param minVersions minimum number of versions to keep
77 * @param maxVersions maximum versions to return per column
78 * @param oldestUnexpiredTS the oldest timestamp we are interested in,
79 * based on TTL
80 */
81 public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions,
82 int maxVersions, long oldestUnexpiredTS) {
83 this.maxVersions = maxVersions;
84 this.minVersions = minVersions;
85 this.oldestStamp = oldestUnexpiredTS;
86 this.columns = new ColumnCount[columns.size()];
87 int i=0;
88 for(byte [] column : columns) {
89 this.columns[i++] = new ColumnCount(column);
90 }
91 reset();
92 }
93
94 /**
95 * Done when there are no more columns to match against.
96 */
97 public boolean done() {
98 return this.index >= columns.length;
99 }
100
101 public ColumnCount getColumnHint() {
102 return this.column;
103 }
104
105 /**
106 * {@inheritDoc}
107 */
108 @Override
109 public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
110 int length, byte type) {
111 // delete markers should never be passed to an
112 // *Explicit*ColumnTracker
113 assert !CellUtil.isDelete(type);
114 do {
115 // No more columns left, we are done with this query
116 if(done()) {
117 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
118 }
119
120 // No more columns to match against, done with storefile
121 if(this.column == null) {
122 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
123 }
124
125 // Compare specific column to current column
126 int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(),
127 column.getLength(), bytes, offset, length);
128
129 // Column Matches. Return include code. The caller would call checkVersions
130 // to limit the number of versions.
131 if(ret == 0) {
132 return ScanQueryMatcher.MatchCode.INCLUDE;
133 }
134
135 resetTS();
136
137 if (ret > 0) {
138 // The current KV is smaller than the column the ExplicitColumnTracker
139 // is interested in, so seek to that column of interest.
140 return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
141 }
142
143 // The current KV is bigger than the column the ExplicitColumnTracker
144 // is interested in. That means there is no more data for the column
145 // of interest. Advance the ExplicitColumnTracker state to next
146 // column of interest, and check again.
147 ++this.index;
148 if (done()) {
149 // No more to match, do not include, done with this row.
150 return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
151 }
152 // This is the recursive case.
153 this.column = this.columns[this.index];
154 } while(true);
155 }
156
157 @Override
158 public ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length,
159 long timestamp, byte type, boolean ignoreCount) throws IOException {
160 assert !CellUtil.isDelete(type);
161 if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
162 // Check if it is a duplicate timestamp
163 if (sameAsPreviousTS(timestamp)) {
164 // If duplicate, skip this Key
165 return ScanQueryMatcher.MatchCode.SKIP;
166 }
167 int count = this.column.increment();
168 if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) {
169 // Done with versions for this column
170 ++this.index;
171 resetTS();
172 if (done()) {
173 // We have served all the requested columns.
174 this.column = null;
175 return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
176 }
177 // We are done with current column; advance to next column
178 // of interest.
179 this.column = this.columns[this.index];
180 return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
181 }
182 setTS(timestamp);
183 return ScanQueryMatcher.MatchCode.INCLUDE;
184 }
185
186 // Called between every row.
187 public void reset() {
188 this.index = 0;
189 this.column = this.columns[this.index];
190 for(ColumnCount col : this.columns) {
191 col.setCount(0);
192 }
193 resetTS();
194 }
195
196 private void resetTS() {
197 latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP;
198 }
199
200 private void setTS(long timestamp) {
201 latestTSOfCurrentColumn = timestamp;
202 }
203
204 private boolean sameAsPreviousTS(long timestamp) {
205 return timestamp == latestTSOfCurrentColumn;
206 }
207
208 private boolean isExpired(long timestamp) {
209 return timestamp < oldestStamp;
210 }
211
212 /**
213 * This method is used to inform the column tracker that we are done with
214 * this column. We may get this information from external filters or
215 * timestamp range and we then need to indicate this information to
216 * tracker. It is required only in case of ExplicitColumnTracker.
217 * @param bytes
218 * @param offset
219 * @param length
220 */
221 public void doneWithColumn(byte [] bytes, int offset, int length) {
222 while (this.column != null) {
223 int compare = Bytes.compareTo(column.getBuffer(), column.getOffset(),
224 column.getLength(), bytes, offset, length);
225 resetTS();
226 if (compare <= 0) {
227 ++this.index;
228 if (done()) {
229 // Will not hit any more columns in this storefile
230 this.column = null;
231 } else {
232 this.column = this.columns[this.index];
233 }
234 if (compare <= -1)
235 continue;
236 }
237 return;
238 }
239 }
240
241 public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
242 int qualLength) {
243 doneWithColumn(bytes, offset,qualLength);
244
245 if (getColumnHint() == null) {
246 return MatchCode.SEEK_NEXT_ROW;
247 } else {
248 return MatchCode.SEEK_NEXT_COL;
249 }
250 }
251
252 public boolean isDone(long timestamp) {
253 return minVersions <= 0 && isExpired(timestamp);
254 }
255 }