View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableSet;
27  import java.util.concurrent.locks.ReentrantLock;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.DoNotRetryIOException;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.client.Scan;
35  import org.apache.hadoop.hbase.filter.Filter;
36  import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
37  import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
38  import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
41  
42  /**
43   * Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
44   * into List<KeyValue> for a single row.
45   */
46  public class StoreScanner extends NonLazyKeyValueScanner
47      implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
48    static final Log LOG = LogFactory.getLog(StoreScanner.class);
49    private Store store;
50    private ScanQueryMatcher matcher;
51    private KeyValueHeap heap;
52    private boolean cacheBlocks;
53  
54  
55    private String metricNamePrefix;
56    private String metricNamePrefixNext;
57    // Used to indicate that the scanner has closed (see HBASE-1107)
58    // Doesnt need to be volatile because it's always accessed via synchronized methods
59    private boolean closing = false;
60    private final boolean isGet;
61    private final boolean explicitColumnQuery;
62    private final boolean useRowColBloom;
63    private final Scan scan;
64    private final NavigableSet<byte[]> columns;
65    private final long oldestUnexpiredTS;
66    private final int minVersions;
67  
68    /** We don't ever expect to change this, the constant is just for clarity. */
69    static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
70  
71    /** Used during unit testing to ensure that lazy seek does save seek ops */
72    private static boolean lazySeekEnabledGlobally =
73        LAZY_SEEK_ENABLED_BY_DEFAULT;
74  
75    // if heap == null and lastTop != null, you need to reseek given the key below
76    private KeyValue lastTop = null;
77  
78    // A flag whether use pread for scan
79    private boolean scanUsePread = false;
80    private ReentrantLock lock = new ReentrantLock();
81  
82    /** An internal constructor. */
83    private StoreScanner(Store store, boolean cacheBlocks, Scan scan,
84        final NavigableSet<byte[]> columns, long ttl, int minVersions) {
85      this.store = store;
86      this.cacheBlocks = cacheBlocks;
87      isGet = scan.isGetScan();
88      int numCol = columns == null ? 0 : columns.size();
89      explicitColumnQuery = numCol > 0;
90      this.scan = scan;
91      this.columns = columns;
92      oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
93      this.minVersions = minVersions;
94  
95      // We look up row-column Bloom filters for multi-column queries as part of
96      // the seek operation. However, we also look the row-column Bloom filter
97      // for multi-row (non-"get") scans because this is not done in
98      // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
99      useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
100     this.scanUsePread = scan.isSmall();
101   }
102 
103   /**
104    * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
105    * are not in a compaction.
106    *
107    * @param store who we scan
108    * @param scan the spec
109    * @param columns which columns we are scanning
110    * @throws IOException
111    */
112   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns)
113                               throws IOException {
114     this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
115         scanInfo.getMinVersions());
116     initializeMetricNames();
117     if (columns != null && scan.isRaw()) {
118       throw new DoNotRetryIOException(
119           "Cannot specify any column for a raw scan");
120     }
121     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
122         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
123         oldestUnexpiredTS);
124 
125     // Pass columns to try to filter out unnecessary StoreFiles.
126     List<KeyValueScanner> scanners = getScannersNoCompaction();
127 
128     // Seek all scanners to the start of the Row (or if the exact matching row
129     // key does not exist, then to the start of the next matching Row).
130     // Always check bloom filter to optimize the top row seek for delete
131     // family marker.
132     if (explicitColumnQuery && lazySeekEnabledGlobally) {
133       for (KeyValueScanner scanner : scanners) {
134         scanner.requestSeek(matcher.getStartKey(), false, true);
135       }
136     } else {
137       for (KeyValueScanner scanner : scanners) {
138         scanner.seek(matcher.getStartKey());
139       }
140     }
141 
142     // Combine all seeked scanners with a heap
143     heap = new KeyValueHeap(scanners, store.comparator);
144 
145     this.store.addChangedReaderObserver(this);
146   }
147 
148   /**
149    * Used for major compactions.<p>
150    *
151    * Opens a scanner across specified StoreFiles.
152    * @param store who we scan
153    * @param scan the spec
154    * @param scanners ancillary scanners
155    * @param smallestReadPoint the readPoint that we should use for tracking
156    *          versions
157    */
158   public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
159       List<? extends KeyValueScanner> scanners, ScanType scanType,
160       long smallestReadPoint, long earliestPutTs) throws IOException {
161     this(store, false, scan, null, scanInfo.getTtl(),
162         scanInfo.getMinVersions());
163     initializeMetricNames();
164     matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
165         smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
166 
167     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
168     scanners = selectScannersFrom(scanners);
169 
170     // Seek all scanners to the initial key
171     for(KeyValueScanner scanner : scanners) {
172       scanner.seek(matcher.getStartKey());
173     }
174 
175     // Combine all seeked scanners with a heap
176     heap = new KeyValueHeap(scanners, store.comparator);
177   }
178 
179   /** Constructor for testing. */
180   StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
181       ScanType scanType, final NavigableSet<byte[]> columns,
182       final List<KeyValueScanner> scanners) throws IOException {
183     this(scan, scanInfo, scanType, columns, scanners,
184         HConstants.LATEST_TIMESTAMP);
185   }
186 
187   // Constructor for testing.
188   StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
189       ScanType scanType, final NavigableSet<byte[]> columns,
190       final List<KeyValueScanner> scanners, long earliestPutTs)
191           throws IOException {
192     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
193         scanInfo.getMinVersions());
194     this.initializeMetricNames();
195     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
196         Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS);
197 
198     // Seek all scanners to the initial key
199     for (KeyValueScanner scanner : scanners) {
200       scanner.seek(matcher.getStartKey());
201     }
202     heap = new KeyValueHeap(scanners, scanInfo.getComparator());
203   }
204 
205   /**
206    * Method used internally to initialize metric names throughout the
207    * constructors.
208    *
209    * To be called after the store variable has been initialized!
210    */
211   private void initializeMetricNames() {
212     String tableName = SchemaMetrics.UNKNOWN;
213     String family = SchemaMetrics.UNKNOWN;
214     if (store != null) {
215       tableName = store.getTableName();
216       family = Bytes.toString(store.getFamily().getName());
217     }
218     this.metricNamePrefix =
219         SchemaMetrics.generateSchemaMetricsPrefix(tableName, family);
220   }
221 
222   /**
223    * Get a filtered list of scanners. Assumes we are not in a compaction.
224    * @return list of scanners to seek
225    */
226   private List<KeyValueScanner> getScannersNoCompaction() throws IOException {
227     final boolean isCompaction = false;
228     boolean usePread = isGet || scanUsePread;
229     return selectScannersFrom(store.getScanners(cacheBlocks, usePread,
230         isCompaction, matcher));
231   }
232 
233   /**
234    * Filters the given list of scanners using Bloom filter, time range, and
235    * TTL.
236    */
237   private List<KeyValueScanner> selectScannersFrom(
238       final List<? extends KeyValueScanner> allScanners) {
239     boolean memOnly;
240     boolean filesOnly;
241     if (scan instanceof InternalScan) {
242       InternalScan iscan = (InternalScan)scan;
243       memOnly = iscan.isCheckOnlyMemStore();
244       filesOnly = iscan.isCheckOnlyStoreFiles();
245     } else {
246       memOnly = false;
247       filesOnly = false;
248     }
249 
250     List<KeyValueScanner> scanners =
251         new ArrayList<KeyValueScanner>(allScanners.size());
252 
253     // We can only exclude store files based on TTL if minVersions is set to 0.
254     // Otherwise, we might have to return KVs that have technically expired.
255     long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
256         Long.MIN_VALUE;
257 
258     // include only those scan files which pass all filters
259     for (KeyValueScanner kvs : allScanners) {
260       boolean isFile = kvs.isFileScanner();
261       if ((!isFile && filesOnly) || (isFile && memOnly)) {
262         continue;
263       }
264 
265       if (kvs.shouldUseScanner(scan, columns, expiredTimestampCutoff)) {
266         scanners.add(kvs);
267       }
268     }
269     return scanners;
270   }
271 
272   @Override
273   public KeyValue peek() {
274     lock.lock();
275     try {
276 
277     if (this.heap == null) {
278       return this.lastTop;
279     }
280     return this.heap.peek();
281     } finally {
282       lock.unlock();
283     }
284   }
285 
286   @Override
287   public KeyValue next() {
288     // throw runtime exception perhaps?
289     throw new RuntimeException("Never call StoreScanner.next()");
290   }
291 
292   @Override
293   public void close() {
294     lock.lock();
295     try {
296     if (this.closing) return;
297     this.closing = true;
298     // under test, we dont have a this.store
299     if (this.store != null)
300       this.store.deleteChangedReaderObserver(this);
301     if (this.heap != null)
302       this.heap.close();
303     this.heap = null; // CLOSED!
304     this.lastTop = null; // If both are null, we are closed.
305     } finally {
306       lock.unlock();
307     }
308   }
309 
310   @Override
311   public boolean seek(KeyValue key) throws IOException {
312     lock.lock();
313     try {
314     if (this.heap == null) {
315 
316       List<KeyValueScanner> scanners = getScannersNoCompaction();
317 
318       heap = new KeyValueHeap(scanners, store.comparator);
319     }
320 
321     return this.heap.seek(key);
322   } finally {
323     lock.unlock();
324   }
325   }
326 
327   /**
328    * Get the next row of values from this Store.
329    * @param outResult
330    * @param limit
331    * @return true if there are more rows, false if scanner is done
332    */
333   @Override
334   public boolean next(List<KeyValue> outResult, int limit) throws IOException {
335     return next(outResult, limit, null);
336   }
337 
338   /**
339    * Get the next row of values from this Store.
340    * @param outResult
341    * @param limit
342    * @return true if there are more rows, false if scanner is done
343    */
344   @Override
345   public boolean next(List<KeyValue> outResult, int limit,
346       String metric) throws IOException {
347     lock.lock();
348     try {
349 
350     if (checkReseek()) {
351       return true;
352     }
353 
354     // if the heap was left null, then the scanners had previously run out anyways, close and
355     // return.
356     if (this.heap == null) {
357       close();
358       return false;
359     }
360 
361     KeyValue peeked = this.heap.peek();
362     if (peeked == null) {
363       close();
364       return false;
365     }
366 
367     // only call setRow if the row changes; avoids confusing the query matcher
368     // if scanning intra-row
369     byte[] row = peeked.getBuffer();
370     int offset = peeked.getRowOffset();
371     short length = peeked.getRowLength();
372     if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
373       matcher.setRow(row, offset, length);
374     }
375 
376     KeyValue kv;
377     KeyValue prevKV = null;
378 
379     // Only do a sanity-check if store and comparator are available.
380     KeyValue.KVComparator comparator =
381         store != null ? store.getComparator() : null;
382 
383     long cumulativeMetric = 0;
384     int count = 0;
385     try {
386       LOOP: while((kv = this.heap.peek()) != null) {
387         // Check that the heap gives us KVs in an increasing order.
388         assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 :
389           "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store;
390         prevKV = kv;
391         ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
392         switch(qcode) {
393           case INCLUDE:
394           case INCLUDE_AND_SEEK_NEXT_ROW:
395           case INCLUDE_AND_SEEK_NEXT_COL:
396 
397             Filter f = matcher.getFilter();
398             outResult.add(f == null ? kv : f.transform(kv));
399             count++;
400 
401             if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
402               if (!matcher.moreRowsMayExistAfter(kv)) {
403                 return false;
404               }
405               reseek(matcher.getKeyForNextRow(kv));
406             } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
407               reseek(matcher.getKeyForNextColumn(kv));
408             } else {
409               this.heap.next();
410             }
411 
412             cumulativeMetric += kv.getLength();
413             if (limit > 0 && (count == limit)) {
414               break LOOP;
415             }
416             continue;
417 
418           case DONE:
419             return true;
420 
421           case DONE_SCAN:
422             close();
423 
424             return false;
425 
426           case SEEK_NEXT_ROW:
427             // This is just a relatively simple end of scan fix, to short-cut end
428             // us if there is an endKey in the scan.
429             if (!matcher.moreRowsMayExistAfter(kv)) {
430               return false;
431             }
432 
433             reseek(matcher.getKeyForNextRow(kv));
434             break;
435 
436           case SEEK_NEXT_COL:
437             reseek(matcher.getKeyForNextColumn(kv));
438             break;
439 
440           case SKIP:
441             this.heap.next();
442             break;
443 
444           case SEEK_NEXT_USING_HINT:
445             KeyValue nextKV = matcher.getNextKeyHint(kv);
446             if (nextKV != null) {
447               reseek(nextKV);
448             } else {
449               heap.next();
450             }
451             break;
452 
453           default:
454             throw new RuntimeException("UNEXPECTED");
455         }
456       }
457     } finally {
458       if (cumulativeMetric > 0 && metric != null) {
459         // OK to use identity here
460         if (metric == SchemaMetrics.METRIC_NEXTSIZE) {
461           if (metricNamePrefixNext == null) metricNamePrefixNext = metricNamePrefix + metric;
462           RegionMetricsStorage.incrNumericMetric(metricNamePrefixNext, cumulativeMetric);
463         } else {
464           RegionMetricsStorage.incrNumericMetric(metricNamePrefix + metric, cumulativeMetric);
465         }
466       }
467     }
468 
469     if (count > 0) {
470       return true;
471     }
472 
473     // No more keys
474     close();
475     return false;
476     } finally {
477       lock.unlock();
478     }
479   }
480 
481   @Override
482   public boolean next(List<KeyValue> outResult) throws IOException {
483     return next(outResult, -1, null);
484   }
485 
486   @Override
487   public boolean next(List<KeyValue> outResult, String metric)
488       throws IOException {
489     return next(outResult, -1, metric);
490   }
491 
492   // Implementation of ChangedReadersObserver
493   @Override
494   public void updateReaders() throws IOException {
495     lock.lock();
496     try {
497     if (this.closing) return;
498 
499     // All public synchronized API calls will call 'checkReseek' which will cause
500     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
501     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
502     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
503     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
504     if (this.heap == null) return;
505 
506     // this could be null.
507     this.lastTop = this.peek();
508 
509     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
510 
511     // close scanners to old obsolete Store files
512     this.heap.close(); // bubble thru and close all scanners.
513     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
514 
515     // Let the next() call handle re-creating and seeking
516     } finally {
517       lock.unlock();
518     }
519   }
520 
521   /**
522    * @return true if top of heap has changed (and KeyValueHeap has to try the
523    *         next KV)
524    * @throws IOException
525    */
526   private boolean checkReseek() throws IOException {
527     if (this.heap == null && this.lastTop != null) {
528       resetScannerStack(this.lastTop);
529       if (this.heap.peek() == null
530           || store.comparator.compareRows(this.lastTop, this.heap.peek()) != 0) {
531         LOG.debug("Storescanner.peek() is changed where before = "
532             + this.lastTop.toString() + ",and after = " + this.heap.peek());
533         this.lastTop = null;
534         return true;
535       }
536       this.lastTop = null; // gone!
537     }
538     // else dont need to reseek
539     return false;
540   }
541 
542   private void resetScannerStack(KeyValue lastTopKey) throws IOException {
543     if (heap != null) {
544       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
545     }
546 
547     /* When we have the scan object, should we not pass it to getScanners()
548      * to get a limited set of scanners? We did so in the constructor and we
549      * could have done it now by storing the scan object from the constructor */
550     List<KeyValueScanner> scanners = getScannersNoCompaction();
551 
552     for(KeyValueScanner scanner : scanners) {
553       scanner.seek(lastTopKey);
554     }
555 
556     // Combine all seeked scanners with a heap
557     heap = new KeyValueHeap(scanners, store.comparator);
558 
559     // Reset the state of the Query Matcher and set to top row.
560     // Only reset and call setRow if the row changes; avoids confusing the
561     // query matcher if scanning intra-row.
562     KeyValue kv = heap.peek();
563     if (kv == null) {
564       kv = lastTopKey;
565     }
566     byte[] row = kv.getBuffer();
567     int offset = kv.getRowOffset();
568     short length = kv.getRowLength();
569     if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
570       matcher.reset();
571       matcher.setRow(row, offset, length);
572     }
573   }
574 
575   @Override
576   public boolean reseek(KeyValue kv) throws IOException {
577     lock.lock();
578     try {
579     //Heap will not be null, if this is called from next() which.
580     //If called from RegionScanner.reseek(...) make sure the scanner
581     //stack is reset if needed.
582     checkReseek();
583     if (explicitColumnQuery && lazySeekEnabledGlobally) {
584       return heap.requestSeek(kv, true, useRowColBloom);
585     } else {
586       return heap.reseek(kv);
587     }
588     } finally {
589       lock.unlock();
590     }
591   }
592 
593   @Override
594   public long getSequenceID() {
595     return 0;
596   }
597 
598   /**
599    * Used in testing.
600    * @return all scanners in no particular order
601    */
602   List<KeyValueScanner> getAllScannersForTesting() {
603     List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
604     KeyValueScanner current = heap.getCurrentForTesting();
605     if (current != null)
606       allScanners.add(current);
607     for (KeyValueScanner scanner : heap.getHeap())
608       allScanners.add(scanner);
609     return allScanners;
610   }
611 
612   static void enableLazySeekGlobally(boolean enable) {
613     lazySeekEnabledGlobally = enable;
614   }
615 }
616