View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import java.io.IOException;
24  import java.util.NavigableSet;
25  
26  import org.apache.hadoop.hbase.HConstants;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.client.Scan;
29  import org.apache.hadoop.hbase.filter.Filter;
30  import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
31  import org.apache.hadoop.hbase.io.TimeRange;
32  import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
35  
36  /**
37   * A query matcher that is specifically designed for the scan case.
38   */
39  public class ScanQueryMatcher {
40    // Optimization so we can skip lots of compares when we decide to skip
41    // to the next row.
42    private boolean stickyNextRow;
43    private final byte[] stopRow;
44  
45    private final TimeRange tr;
46  
47    private final Filter filter;
48  
49    /** Keeps track of deletes */
50    private final DeleteTracker deletes;
51  
52    /*
53     * The following three booleans define how we deal with deletes.
54     * There are three different aspects:
55     * 1. Whether to keep delete markers. This is used in compactions.
56     *    Minor compactions always keep delete markers.
57     * 2. Whether to keep deleted rows. This is also used in compactions,
58     *    if the store is set to keep deleted rows. This implies keeping
59     *    the delete markers as well.
60     *    In this case deleted rows are subject to the normal max version
61     *    and TTL/min version rules just like "normal" rows.
62     * 3. Whether a scan can do time travel queries even before deleted
63     *    marker to reach deleted rows.
64     */
65    /** whether to retain delete markers */
66    private final boolean retainDeletesInOutput;
67    /** whether to return deleted rows */
68    private final boolean keepDeletedCells;
69    /** whether time range queries can see rows "behind" a delete */
70    private final boolean seePastDeleteMarkers;
71  
72  
73    /** Keeps track of columns and versions */
74    private final ColumnTracker columns;
75  
76    /** Key to seek to in memstore and StoreFiles */
77    private final KeyValue startKey;
78  
79    /** Row comparator for the region this query is for */
80    private final KeyValue.KeyComparator rowComparator;
81  
82    /* row is not private for tests */
83    /** Row the query is on */
84    byte [] row;
85    int rowOffset;
86    short rowLength;
87    
88    /**
89     * Oldest put in any of the involved store files
90     * Used to decide whether it is ok to delete
91     * family delete marker of this store keeps
92     * deleted KVs.
93     */
94    private final long earliestPutTs;
95  
96    /** readPoint over which the KVs are unconditionally included */
97    protected long maxReadPointToTrackVersions;
98  
99    /**
100    * This variable shows whether there is an null column in the query. There
101    * always exists a null column in the wildcard column query.
102    * There maybe exists a null column in the explicit column query based on the
103    * first column.
104    * */
105   private boolean hasNullColumn = true;
106 
107   // By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete
108   // marker is always removed during a major compaction. If set to non-zero
109   // value then major compaction will try to keep a delete marker around for
110   // the given number of milliseconds. We want to keep the delete markers
111   // around a bit longer because old puts might appear out-of-order. For
112   // example, during log replication between two clusters.
113   //
114   // If the delete marker has lived longer than its column-family's TTL then
115   // the delete marker will be removed even if time.to.purge.deletes has not
116   // passed. This is because all the Puts that this delete marker can influence
117   // would have also expired. (Removing of delete markers on col family TTL will
118   // not happen if min-versions is set to non-zero)
119   //
120   // But, if time.to.purge.deletes has not expired then a delete
121   // marker will not be removed just because there are no Puts that it is
122   // currently influencing. This is because Puts, that this delete can
123   // influence.  may appear out of order.
124   private final long timeToPurgeDeletes;
125   
126   private final boolean isUserScan;
127 
128   /**
129    * Construct a QueryMatcher for a scan
130    * @param scan
131    * @param scanInfo The store's immutable scan info
132    * @param columns
133    * @param scanType Type of the scan
134    * @param earliestPutTs Earliest put seen in any of the store files.
135    * @param oldestUnexpiredTS the oldest timestamp we are interested in,
136    *  based on TTL
137    */
138   public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
139       NavigableSet<byte[]> columns, ScanType scanType,
140       long readPointToUse, long earliestPutTs, long oldestUnexpiredTS) {
141     this.tr = scan.getTimeRange();
142     this.rowComparator = scanInfo.getComparator().getRawComparator();
143     this.deletes =  new ScanDeleteTracker();
144     this.stopRow = scan.getStopRow();
145     this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow(),
146         scanInfo.getFamily());
147     this.filter = scan.getFilter();
148     this.earliestPutTs = earliestPutTs;
149     this.maxReadPointToTrackVersions = readPointToUse;
150     this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
151 
152     /* how to deal with deletes */
153     this.isUserScan = scanType == ScanType.USER_SCAN;
154     // keep deleted cells: if compaction or raw scan
155     this.keepDeletedCells = (scanInfo.getKeepDeletedCells() && !isUserScan) || scan.isRaw();
156     // retain deletes: if minor compaction or raw scan
157     this.retainDeletesInOutput = scanType == ScanType.MINOR_COMPACT || scan.isRaw();
158     // seePastDeleteMarker: user initiated scans
159     this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() && isUserScan;
160 
161     int maxVersions =
162         scan.isRaw() ? scan.getMaxVersions() : Math.min(scan.getMaxVersions(),
163           scanInfo.getMaxVersions());
164 
165     // Single branch to deal with two types of reads (columns vs all in family)
166     if (columns == null || columns.size() == 0) {
167       // there is always a null column in the wildcard column query.
168       hasNullColumn = true;
169 
170       // use a specialized scan for wildcard column tracker.
171       this.columns = new ScanWildcardColumnTracker(
172           scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS);
173     } else {
174       // whether there is null column in the explicit column query
175       hasNullColumn = (columns.first().length == 0);
176 
177       // We can share the ExplicitColumnTracker, diff is we reset
178       // between rows, not between storefiles.
179       byte[] attr = scan.getAttribute(Scan.HINT_LOOKAHEAD);
180       this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
181           oldestUnexpiredTS, attr == null ? 0 : Bytes.toInt(attr));
182     }
183   }
184 
185   /*
186    * Constructor for tests
187    */
188   ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
189       NavigableSet<byte[]> columns, long oldestUnexpiredTS) {
190     this(scan, scanInfo, columns, ScanType.USER_SCAN,
191           Long.MAX_VALUE, /* max Readpoint to track versions */
192         HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS);
193   }
194 
195   /**
196    *
197    * @return  whether there is an null column in the query
198    */
199   public boolean hasNullColumnInQuery() {
200     return hasNullColumn;
201   }
202 
203   /**
204    * Determines if the caller should do one of several things:
205    * - seek/skip to the next row (MatchCode.SEEK_NEXT_ROW)
206    * - seek/skip to the next column (MatchCode.SEEK_NEXT_COL)
207    * - include the current KeyValue (MatchCode.INCLUDE)
208    * - ignore the current KeyValue (MatchCode.SKIP)
209    * - got to the next row (MatchCode.DONE)
210    *
211    * @param kv KeyValue to check
212    * @return The match code instance.
213    * @throws IOException in case there is an internal consistency problem
214    *      caused by a data corruption.
215    */
216   public MatchCode match(KeyValue kv) throws IOException {
217     if (filter != null && filter.filterAllRemaining()) {
218       return MatchCode.DONE_SCAN;
219     }
220 
221     byte [] bytes = kv.getBuffer();
222     int offset = kv.getOffset();
223 
224     int keyLength = Bytes.toInt(bytes, offset, Bytes.SIZEOF_INT);
225     offset += KeyValue.ROW_OFFSET;
226 
227     int initialOffset = offset;
228 
229     short rowLength = Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT);
230     offset += Bytes.SIZEOF_SHORT;
231 
232     int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
233         bytes, offset, rowLength);
234     if (ret <= -1) {
235       return MatchCode.DONE;
236     } else if (ret >= 1) {
237       // could optimize this, if necessary?
238       // Could also be called SEEK_TO_CURRENT_ROW, but this
239       // should be rare/never happens.
240       return MatchCode.SEEK_NEXT_ROW;
241     }
242 
243     // optimize case.
244     if (this.stickyNextRow)
245         return MatchCode.SEEK_NEXT_ROW;
246 
247     if (this.columns.done()) {
248       stickyNextRow = true;
249       return MatchCode.SEEK_NEXT_ROW;
250     }
251 
252     //Passing rowLength
253     offset += rowLength;
254 
255     //Skipping family
256     byte familyLength = bytes [offset];
257     offset += familyLength + 1;
258 
259     int qualLength = keyLength -
260       (offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE;
261 
262     long timestamp = Bytes.toLong(bytes, initialOffset + keyLength - KeyValue.TIMESTAMP_TYPE_SIZE);
263     // check for early out based on timestamp alone
264     if (columns.isDone(timestamp)) {
265         return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
266     }
267 
268     /*
269      * The delete logic is pretty complicated now.
270      * This is corroborated by the following:
271      * 1. The store might be instructed to keep deleted rows around.
272      * 2. A scan can optionally see past a delete marker now.
273      * 3. If deleted rows are kept, we have to find out when we can
274      *    remove the delete markers.
275      * 4. Family delete markers are always first (regardless of their TS)
276      * 5. Delete markers should not be counted as version
277      * 6. Delete markers affect puts of the *same* TS
278      * 7. Delete marker need to be version counted together with puts
279      *    they affect
280      */
281     byte type = bytes[initialOffset + keyLength - 1];
282     if (kv.isDelete()) {
283       if (!keepDeletedCells) {
284         // first ignore delete markers if the scanner can do so, and the
285         // range does not include the marker
286         //
287         // during flushes and compactions also ignore delete markers newer
288         // than the readpoint of any open scanner, this prevents deleted
289         // rows that could still be seen by a scanner from being collected
290         boolean includeDeleteMarker = seePastDeleteMarkers ?
291             tr.withinTimeRange(timestamp) :
292             tr.withinOrAfterTimeRange(timestamp);
293         if (includeDeleteMarker
294             && kv.getMemstoreTS() <= maxReadPointToTrackVersions) {
295           this.deletes.add(bytes, offset, qualLength, timestamp, type);
296         }
297         // Can't early out now, because DelFam come before any other keys
298       }
299       if ((!isUserScan)
300           && timeToPurgeDeletes > 0
301           && (EnvironmentEdgeManager.currentTimeMillis() - timestamp) <= timeToPurgeDeletes) {
302         return MatchCode.INCLUDE;
303       } else if (retainDeletesInOutput || kv.getMemstoreTS() > maxReadPointToTrackVersions) {
304         // always include or it is not time yet to check whether it is OK
305         // to purge deltes or not
306         if (!isUserScan) {
307           // if this is not a user scan (compaction), we can filter this deletemarker right here
308           // otherwise (i.e. a "raw" scan) we fall through to normal version and timerange checking
309           return MatchCode.INCLUDE;
310         }
311       } else if (keepDeletedCells) {
312         if (timestamp < earliestPutTs) {
313           // keeping delete rows, but there are no puts older than
314           // this delete in the store files.
315           return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
316         }
317         // else: fall through and do version counting on the
318         // delete markers
319       } else {
320         return MatchCode.SKIP;
321       }
322       // note the following next else if...
323       // delete marker are not subject to other delete markers
324     } else if (!this.deletes.isEmpty()) {
325       DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength,
326           timestamp);
327       switch (deleteResult) {
328         case FAMILY_DELETED:
329         case COLUMN_DELETED:
330           return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
331         case VERSION_DELETED:
332           return MatchCode.SKIP;
333         case NOT_DELETED:
334           break;
335         default:
336           throw new RuntimeException("UNEXPECTED");
337         }
338     }
339 
340     int timestampComparison = tr.compare(timestamp);
341     if (timestampComparison >= 1) {
342       return MatchCode.SKIP;
343     } else if (timestampComparison <= -1) {
344       return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
345     }
346 
347     // STEP 1: Check if the column is part of the requested columns
348     MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, type);
349     if (colChecker == MatchCode.INCLUDE) {
350       ReturnCode filterResponse = ReturnCode.SKIP;
351       // STEP 2: Yes, the column is part of the requested columns. Check if filter is present
352       if (filter != null) {
353         // STEP 3: Filter the key value and return if it filters out
354         filterResponse = filter.filterKeyValue(kv);
355         switch (filterResponse) {
356         case SKIP:
357           return MatchCode.SKIP;
358         case NEXT_COL:
359           return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
360         case NEXT_ROW:
361           stickyNextRow = true;
362           return MatchCode.SEEK_NEXT_ROW;
363         case SEEK_NEXT_USING_HINT:
364           return MatchCode.SEEK_NEXT_USING_HINT;
365         default:
366           //It means it is either include or include and seek next
367           break;
368         }
369       }
370       /*
371        * STEP 4: Reaching this step means the column is part of the requested columns and either
372        * the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.
373        * Now check the number of versions needed. This method call returns SKIP, INCLUDE,
374        * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL.
375        *
376        * FilterResponse            ColumnChecker               Desired behavior
377        * INCLUDE                   SKIP                        row has already been included, SKIP.
378        * INCLUDE                   INCLUDE                     INCLUDE
379        * INCLUDE                   INCLUDE_AND_SEEK_NEXT_COL   INCLUDE_AND_SEEK_NEXT_COL
380        * INCLUDE                   INCLUDE_AND_SEEK_NEXT_ROW   INCLUDE_AND_SEEK_NEXT_ROW
381        * INCLUDE_AND_SEEK_NEXT_COL SKIP                        row has already been included, SKIP.
382        * INCLUDE_AND_SEEK_NEXT_COL INCLUDE                     INCLUDE_AND_SEEK_NEXT_COL
383        * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL   INCLUDE_AND_SEEK_NEXT_COL
384        * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW   INCLUDE_AND_SEEK_NEXT_ROW
385        *
386        * In all the above scenarios, we return the column checker return value except for
387        * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
388        */
389       colChecker =
390           columns.checkVersions(bytes, offset, qualLength, timestamp, type,
391             kv.getMemstoreTS() > maxReadPointToTrackVersions);
392       //Optimize with stickyNextRow
393       stickyNextRow = colChecker == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW ? true : stickyNextRow;
394       return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL &&
395           colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL
396           : colChecker;
397     }
398     stickyNextRow = (colChecker == MatchCode.SEEK_NEXT_ROW) ? true
399         : stickyNextRow;
400     return colChecker;
401   }
402 
403   public boolean moreRowsMayExistAfter(KeyValue kv) {
404     if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
405         rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
406             kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) {
407       // KV >= STOPROW
408       // then NO there is nothing left.
409       return false;
410     } else {
411       return true;
412     }
413   }
414 
415   /**
416    * Set current row
417    * @param row
418    */
419   public void setRow(byte [] row, int offset, short length) {
420     this.row = row;
421     this.rowOffset = offset;
422     this.rowLength = length;
423     reset();
424   }
425 
426   public void reset() {
427     this.deletes.reset();
428     this.columns.reset();
429 
430     stickyNextRow = false;
431   }
432 
433   /**
434    *
435    * @return the start key
436    */
437   public KeyValue getStartKey() {
438     return this.startKey;
439   }
440 
441   /**
442    *
443    * @return the Filter
444    */
445   Filter getFilter() {
446     return this.filter;
447   }
448 
449   public KeyValue getNextKeyHint(KeyValue kv) {
450     if (filter == null) {
451       return null;
452     } else {
453       return filter.getNextKeyHint(kv);
454     }
455   }
456 
457   public KeyValue getKeyForNextColumn(KeyValue kv) {
458     ColumnCount nextColumn = columns.getColumnHint();
459     if (nextColumn == null) {
460       return KeyValue.createLastOnRow(
461           kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
462           kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
463           kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
464     } else {
465       return KeyValue.createFirstOnRow(
466           kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
467           kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
468           nextColumn.getBuffer(), nextColumn.getOffset(), nextColumn.getLength());
469     }
470   }
471 
472   public KeyValue getKeyForNextRow(KeyValue kv) {
473     return KeyValue.createLastOnRow(
474         kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
475         null, 0, 0,
476         null, 0, 0);
477   }
478 
479   // Used only for testing purposes
480   static MatchCode checkColumn(ColumnTracker columnTracker, byte[] bytes, int offset, int length,
481       long ttl, byte type, boolean ignoreCount) throws IOException {
482     MatchCode matchCode = columnTracker.checkColumn(bytes, offset, length, type);
483     if (matchCode == MatchCode.INCLUDE) {
484       return columnTracker.checkVersions(bytes, offset, length, ttl, type, ignoreCount);
485     }
486     return matchCode;
487   }
488 
489   /**
490    * {@link #match} return codes.  These instruct the scanner moving through
491    * memstores and StoreFiles what to do with the current KeyValue.
492    * <p>
493    * Additionally, this contains "early-out" language to tell the scanner to
494    * move on to the next File (memstore or Storefile), or to return immediately.
495    */
496   public static enum MatchCode {
497     /**
498      * Include KeyValue in the returned result
499      */
500     INCLUDE,
501 
502     /**
503      * Do not include KeyValue in the returned result
504      */
505     SKIP,
506 
507     /**
508      * Do not include, jump to next StoreFile or memstore (in time order)
509      */
510     NEXT,
511 
512     /**
513      * Do not include, return current result
514      */
515     DONE,
516 
517     /**
518      * These codes are used by the ScanQueryMatcher
519      */
520 
521     /**
522      * Done with the row, seek there.
523      */
524     SEEK_NEXT_ROW,
525     /**
526      * Done with column, seek to next.
527      */
528     SEEK_NEXT_COL,
529 
530     /**
531      * Done with scan, thanks to the row filter.
532      */
533     DONE_SCAN,
534 
535     /*
536      * Seek to next key which is given as hint.
537      */
538     SEEK_NEXT_USING_HINT,
539 
540     /**
541      * Include KeyValue and done with column, seek to next.
542      */
543     INCLUDE_AND_SEEK_NEXT_COL,
544 
545     /**
546      * Include KeyValue and done with row, seek to next.
547      */
548     INCLUDE_AND_SEEK_NEXT_ROW,
549   }
550 }