View Javadoc

1   /*
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.util.NavigableSet;
23  
24  import org.apache.hadoop.hbase.HConstants;
25  import org.apache.hadoop.hbase.KeyValue;
26  import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
27  import org.apache.hadoop.hbase.util.Bytes;
28  
29  /**
30   * This class is used for the tracking and enforcement of columns and numbers
31   * of versions during the course of a Get or Scan operation, when explicit
32   * column qualifiers have been asked for in the query.
33   *
34   * With a little magic (see {@link ScanQueryMatcher}), we can use this matcher
35   * for both scans and gets.  The main difference is 'next' and 'done' collapse
36   * for the scan case (since we see all columns in order), and we only reset
37   * between rows.
38   *
39   * <p>
40   * This class is utilized by {@link ScanQueryMatcher} mainly through two methods:
41   * <ul><li>{@link #checkColumn} is called when a Put satisfies all other
42   * conditions of the query.
43   * <ul><li>{@link #getNextRowOrNextColumn} is called whenever ScanQueryMatcher
44   * believes that the current column should be skipped (by timestamp, filter etc.)
45   * <p>
46   * These two methods returns a 
47   * {@link org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode}
48   * to define what action should be taken.
49   * <p>
50   * This class is NOT thread-safe as queries are never multi-threaded
51   */
52  public class ExplicitColumnTracker implements ColumnTracker {
53  
54    private final int maxVersions;
55    private final int minVersions;
56  
57    // hint for the tracker about how many KVs we will attempt to search via next()
58    // before we schedule a (re)seek operation
59    private final int lookAhead; 
60  
61   /**
62    * Contains the list of columns that the ExplicitColumnTracker is tracking.
63    * Each ColumnCount instance also tracks how many versions of the requested
64    * column have been returned.
65    */
66    private final ColumnCount[] columns;
67    private int index;
68    private ColumnCount column;
69    /** Keeps track of the latest timestamp included for current column.
70     * Used to eliminate duplicates. */
71    private long latestTSOfCurrentColumn;
72    private long oldestStamp;
73    private int skipCount;
74  
75    /**
76     * Default constructor.
77     * @param columns columns specified user in query
78     * @param minVersions minimum number of versions to keep
79     * @param maxVersions maximum versions to return per column
80     * @param oldestUnexpiredTS the oldest timestamp we are interested in,
81     *  based on TTL 
82     * @param lookAhead number of KeyValues to look ahead via next before
83     *  (re)seeking
84     */
85    public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions,
86        int maxVersions, long oldestUnexpiredTS, int lookAhead) {
87      this.maxVersions = maxVersions;
88      this.minVersions = minVersions;
89      this.lookAhead = lookAhead;
90      this.oldestStamp = oldestUnexpiredTS;
91      this.columns = new ColumnCount[columns.size()];
92      int i=0;
93      for(byte [] column : columns) {
94        this.columns[i++] = new ColumnCount(column);
95      }
96      reset();
97    }
98  
99      /**
100    * Done when there are no more columns to match against.
101    */
102   public boolean done() {
103     return this.index >= columns.length;
104   }
105 
106   public ColumnCount getColumnHint() {
107     return this.column;
108   }
109 
110   /**
111    * {@inheritDoc}
112    */
113   @Override
114   public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
115       int length, byte type) {
116     // delete markers should never be passed to an
117     // *Explicit*ColumnTracker
118     assert !KeyValue.isDelete(type);
119     do {
120       // No more columns left, we are done with this query
121       if(done()) {
122         return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
123       }
124 
125       // No more columns to match against, done with storefile
126       if(this.column == null) {
127         return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
128       }
129 
130       // Compare specific column to current column
131       int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(),
132           column.getLength(), bytes, offset, length);
133 
134       // Column Matches. Return include code. The caller would call checkVersions
135       // to limit the number of versions.
136       if(ret == 0) {
137         return ScanQueryMatcher.MatchCode.INCLUDE;
138       }
139 
140       resetTS();
141 
142       if (ret > 0) {
143         // The current KV is smaller than the column the ExplicitColumnTracker
144         // is interested in, so seek to that column of interest.
145         return this.skipCount++ < this.lookAhead ? ScanQueryMatcher.MatchCode.SKIP
146             : ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
147       }
148 
149       // The current KV is bigger than the column the ExplicitColumnTracker
150       // is interested in. That means there is no more data for the column
151       // of interest. Advance the ExplicitColumnTracker state to next
152       // column of interest, and check again.
153       if (ret <= -1) {
154         ++this.index;
155         this.skipCount = 0;
156         if (done()) {
157           // No more to match, do not include, done with this row.
158           return ScanQueryMatcher.MatchCode.SEEK_NEXT_ROW; // done_row
159         }
160         // This is the recursive case.
161         this.column = this.columns[this.index];
162       }
163     } while(true);
164   }
165 
166   @Override
167   public ScanQueryMatcher.MatchCode checkVersions(byte[] bytes, int offset, int length,
168       long timestamp, byte type, boolean ignoreCount) {
169     assert !KeyValue.isDelete(type);
170     if (ignoreCount) return ScanQueryMatcher.MatchCode.INCLUDE;
171     // Check if it is a duplicate timestamp
172     if (sameAsPreviousTS(timestamp)) {
173       // If duplicate, skip this Key
174       return ScanQueryMatcher.MatchCode.SKIP;
175     }
176     int count = this.column.increment();
177     if (count >= maxVersions || (count >= minVersions && isExpired(timestamp))) {
178       // Done with versions for this column
179       ++this.index;
180       this.skipCount = 0;
181       resetTS();
182       if (done()) {
183         // We have served all the requested columns.
184         this.column = null;
185         return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
186       }
187       // We are done with current column; advance to next column
188       // of interest.
189       this.column = this.columns[this.index];
190       return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
191     }
192     setTS(timestamp);
193     return ScanQueryMatcher.MatchCode.INCLUDE;
194   }
195 
196   // Called between every row.
197   public void reset() {
198     this.index = 0;
199     this.skipCount = 0;
200     this.column = this.columns[this.index];
201     for(ColumnCount col : this.columns) {
202       col.setCount(0);
203     }
204     resetTS();
205   }
206 
207   private void resetTS() {
208     latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP;
209   }
210 
211   private void setTS(long timestamp) {
212     latestTSOfCurrentColumn = timestamp;
213   }
214 
215   private boolean sameAsPreviousTS(long timestamp) {
216     return timestamp == latestTSOfCurrentColumn;
217   }
218 
219   private boolean isExpired(long timestamp) {
220     return timestamp < oldestStamp;
221   }
222 
223   /**
224    * This method is used to inform the column tracker that we are done with
225    * this column. We may get this information from external filters or
226    * timestamp range and we then need to indicate this information to
227    * tracker. It is required only in case of ExplicitColumnTracker.
228    * @param bytes
229    * @param offset
230    * @param length
231    */
232   public void doneWithColumn(byte [] bytes, int offset, int length) {
233     while (this.column != null) {
234       int compare = Bytes.compareTo(column.getBuffer(), column.getOffset(),
235           column.getLength(), bytes, offset, length);
236       resetTS();
237       if (compare <= 0) {
238         ++this.index;
239         this.skipCount = 0;
240         if (done()) {
241           // Will not hit any more columns in this storefile
242           this.column = null;
243         } else {
244           this.column = this.columns[this.index];
245         }
246         if (compare <= -1)
247           continue;
248       }
249       return;
250     }
251   }
252 
253   public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
254       int qualLength) {
255     doneWithColumn(bytes, offset,qualLength);
256 
257     if (getColumnHint() == null) {
258       return MatchCode.SEEK_NEXT_ROW;
259     } else {
260       return MatchCode.SEEK_NEXT_COL;
261     }
262   }
263 
264   public boolean isDone(long timestamp) {
265     return minVersions <= 0 && isExpired(timestamp);
266   }
267 }