1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.DataInput;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.util.Arrays;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.Comparator;
29  import java.util.Map;
30  import java.util.SortedSet;
31  import java.util.UUID;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FSDataInputStream;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
43  import org.apache.hadoop.hbase.KeyValue;
44  import org.apache.hadoop.hbase.KeyValue.KVComparator;
45  import org.apache.hadoop.hbase.KeyValue.MetaKeyComparator;
46  import org.apache.hadoop.hbase.client.Scan;
47  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
48  import org.apache.hadoop.hbase.io.compress.Compression;
49  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
50  import org.apache.hadoop.hbase.io.hfile.BlockType;
51  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
52  import org.apache.hadoop.hbase.io.hfile.HFile;
53  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
54  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
55  import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
56  import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
57  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
58  import org.apache.hadoop.hbase.util.BloomFilter;
59  import org.apache.hadoop.hbase.util.BloomFilterFactory;
60  import org.apache.hadoop.hbase.util.BloomFilterWriter;
61  import org.apache.hadoop.hbase.util.Bytes;
62  import org.apache.hadoop.hbase.util.ChecksumType;
63  import org.apache.hadoop.hbase.util.Writables;
64  import org.apache.hadoop.io.RawComparator;
65  import org.apache.hadoop.io.WritableUtils;
66  
67  import com.google.common.base.Function;
68  import com.google.common.base.Preconditions;
69  import com.google.common.collect.ImmutableList;
70  import com.google.common.collect.Ordering;
71  
72  /**
73   * A Store data file.  Stores usually have one or more of these files.  They
74   * are produced by flushing the memstore to disk.  To
75   * create, instantiate a writer using {@link StoreFile.WriterBuilder}
76   * and append data. Be sure to add any metadata before calling close on the
77   * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
78   * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
79   * passing filesystem and path.  To read, call {@link #createReader()}.
80   * <p>StoreFiles may also reference store files in another Store.
81   *
82   * The reason for this weird pattern where you use a different instance for the
83   * writer and a reader is that we write once but read a lot more.
84   */
85  @InterfaceAudience.LimitedPrivate("Coprocessor")
86  public class StoreFile {
87    static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
88  
89    // Keys for fileinfo values in HFile
90  
91    /** Max Sequence ID in FileInfo */
92    public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
93  
94    /** Major compaction flag in FileInfo */
95    public static final byte[] MAJOR_COMPACTION_KEY =
96        Bytes.toBytes("MAJOR_COMPACTION_KEY");
97  
98    /** Major compaction flag in FileInfo */
99    public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
100       Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
101 
102   /** Bloom filter Type in FileInfo */
103   public static final byte[] BLOOM_FILTER_TYPE_KEY =
104       Bytes.toBytes("BLOOM_FILTER_TYPE");
105 
106   /** Delete Family Count in FileInfo */
107   public static final byte[] DELETE_FAMILY_COUNT =
108       Bytes.toBytes("DELETE_FAMILY_COUNT");
109 
110   /** Last Bloom filter key in FileInfo */
111   private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
112 
113   /** Key for Timerange information in metadata*/
114   public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
115 
116   /** Key for timestamp of earliest-put in metadata*/
117   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
118 
119   // Make default block size for StoreFiles 8k while testing.  TODO: FIX!
120   // Need to make it 8k for testing.
121   public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
122 
123   private final StoreFileInfo fileInfo;
124   private final FileSystem fs;
125 
126   // Block cache configuration and reference.
127   private final CacheConfig cacheConf;
128 
129   // What kind of data block encoding will be used
130   private final HFileDataBlockEncoder dataBlockEncoder;
131 
132   // Keys for metadata stored in backing HFile.
133   // Set when we obtain a Reader.
134   private long sequenceid = -1;
135 
136   // max of the MemstoreTS in the KV's in this store
137   // Set when we obtain a Reader.
138   private long maxMemstoreTS = -1;
139 
140   public long getMaxMemstoreTS() {
141     return maxMemstoreTS;
142   }
143 
144   public void setMaxMemstoreTS(long maxMemstoreTS) {
145     this.maxMemstoreTS = maxMemstoreTS;
146   }
147 
148   // If true, this file was product of a major compaction.  Its then set
149   // whenever you get a Reader.
150   private AtomicBoolean majorCompaction = null;
151 
152   // If true, this file should not be included in minor compactions.
153   // It's set whenever you get a Reader.
154   private boolean excludeFromMinorCompaction = false;
155 
156   /** Meta key set when store file is a result of a bulk load */
157   public static final byte[] BULKLOAD_TASK_KEY =
158     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
159   public static final byte[] BULKLOAD_TIME_KEY =
160     Bytes.toBytes("BULKLOAD_TIMESTAMP");
161 
162   /**
163    * Map of the metadata entries in the corresponding HFile
164    */
165   private Map<byte[], byte[]> metadataMap;
166 
167   // StoreFile.Reader
168   private volatile Reader reader;
169 
170   /**
171    * Bloom filter type specified in column family configuration. Does not
172    * necessarily correspond to the Bloom filter type present in the HFile.
173    */
174   private final BloomType cfBloomType;
175 
176   // the last modification time stamp
177   private long modificationTimeStamp = 0L;
178 
179   /**
180    * Constructor, loads a reader and it's indices, etc. May allocate a
181    * substantial amount of ram depending on the underlying files (10-20MB?).
182    *
183    * @param fs  The current file system to use.
184    * @param p  The path of the file.
185    * @param conf  The current configuration.
186    * @param cacheConf  The cache configuration and block cache reference.
187    * @param cfBloomType The bloom type to use for this store file as specified
188    *          by column family configuration. This may or may not be the same
189    *          as the Bloom filter type actually present in the HFile, because
190    *          column family configuration might change. If this is
191    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
192    * @param dataBlockEncoder data block encoding algorithm.
193    * @throws IOException When opening the reader fails.
194    */
195   public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
196         final CacheConfig cacheConf, final BloomType cfBloomType,
197         final HFileDataBlockEncoder dataBlockEncoder) throws IOException {
198     this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, dataBlockEncoder);
199   }
200 
201 
202   /**
203    * Constructor, loads a reader and it's indices, etc. May allocate a
204    * substantial amount of ram depending on the underlying files (10-20MB?).
205    *
206    * @param fs  The current file system to use.
207    * @param fileInfo  The store file information.
208    * @param conf  The current configuration.
209    * @param cacheConf  The cache configuration and block cache reference.
210    * @param cfBloomType The bloom type to use for this store file as specified
211    *          by column family configuration. This may or may not be the same
212    *          as the Bloom filter type actually present in the HFile, because
213    *          column family configuration might change. If this is
214    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
215    * @param dataBlockEncoder data block encoding algorithm.
216    * @throws IOException When opening the reader fails.
217    */
218   public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
219       final CacheConfig cacheConf,  final BloomType cfBloomType,
220       final HFileDataBlockEncoder dataBlockEncoder) throws IOException {
221     this.fs = fs;
222     this.fileInfo = fileInfo;
223     this.cacheConf = cacheConf;
224     this.dataBlockEncoder =
225         dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE
226             : dataBlockEncoder;
227 
228     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
229       this.cfBloomType = cfBloomType;
230     } else {
231       LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
232           "cfBloomType=" + cfBloomType + " (disabled in config)");
233       this.cfBloomType = BloomType.NONE;
234     }
235 
236     // cache the modification time stamp of this store file
237     this.modificationTimeStamp = fileInfo.getModificationTime();
238   }
239 
240   /**
241    * @return Path or null if this StoreFile was made with a Stream.
242    */
243   public Path getPath() {
244     return this.fileInfo.getPath();
245   }
246 
247   /**
248    * @return True if this is a StoreFile Reference; call after {@link #open()}
249    * else may get wrong answer.
250    */
251   public boolean isReference() {
252     return this.fileInfo.isReference();
253   }
254 
255   /**
256    * @return True if this file was made by a major compaction.
257    */
258   public boolean isMajorCompaction() {
259     if (this.majorCompaction == null) {
260       throw new NullPointerException("This has not been set yet");
261     }
262     return this.majorCompaction.get();
263   }
264 
265   /**
266    * @return True if this file should not be part of a minor compaction.
267    */
268   public boolean excludeFromMinorCompaction() {
269     return this.excludeFromMinorCompaction;
270   }
271 
272   /**
273    * @return This files maximum edit sequence id.
274    */
275   public long getMaxSequenceId() {
276     return this.sequenceid;
277   }
278 
279   public long getModificationTimeStamp() {
280     return modificationTimeStamp;
281   }
282 
283   /**
284    * Return the largest memstoreTS found across all storefiles in
285    * the given list. Store files that were created by a mapreduce
286    * bulk load are ignored, as they do not correspond to any specific
287    * put operation, and thus do not have a memstoreTS associated with them.
288    * @return 0 if no non-bulk-load files are provided or, this is Store that
289    * does not yet have any store files.
290    */
291   public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) {
292     long max = 0;
293     for (StoreFile sf : sfs) {
294       if (!sf.isBulkLoadResult()) {
295         max = Math.max(max, sf.getMaxMemstoreTS());
296       }
297     }
298     return max;
299   }
300 
301   /**
302    * Return the highest sequence ID found across all storefiles in
303    * the given list. Store files that were created by a mapreduce
304    * bulk load are ignored, as they do not correspond to any edit
305    * log items.
306    * @param sfs
307    * @param includeBulkLoadedFiles
308    * @return 0 if no non-bulk-load files are provided or, this is Store that
309    * does not yet have any store files.
310    */
311   public static long getMaxSequenceIdInList(Collection<StoreFile> sfs,
312       boolean includeBulkLoadedFiles) {
313     long max = 0;
314     for (StoreFile sf : sfs) {
315       if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) {
316         max = Math.max(max, sf.getMaxSequenceId());
317       }
318     }
319     return max;
320   }
321 
322   /**
323    * @return true if this storefile was created by HFileOutputFormat
324    * for a bulk load.
325    */
326   boolean isBulkLoadResult() {
327     return metadataMap.containsKey(BULKLOAD_TIME_KEY);
328   }
329 
330   /**
331    * Return the timestamp at which this bulk load file was generated.
332    */
333   public long getBulkLoadTimestamp() {
334     return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY));
335   }
336 
337   /**
338    * @return the cached value of HDFS blocks distribution. The cached value is
339    * calculated when store file is opened.
340    */
341   public HDFSBlocksDistribution getHDFSBlockDistribution() {
342     return this.fileInfo.getHDFSBlockDistribution();
343   }
344 
345   /**
346    * Opens reader on this store file.  Called by Constructor.
347    * @return Reader for the store file.
348    * @throws IOException
349    * @see #closeReader(boolean)
350    */
351   private Reader open() throws IOException {
352     if (this.reader != null) {
353       throw new IllegalAccessError("Already open");
354     }
355 
356     // Open the StoreFile.Reader
357     this.reader = fileInfo.open(this.fs, this.cacheConf, dataBlockEncoder.getEncodingInCache());
358 
359     // Load up indices and fileinfo. This also loads Bloom filter type.
360     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
361 
362     // Read in our metadata.
363     byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
364     if (b != null) {
365       // By convention, if halfhfile, top half has a sequence number > bottom
366       // half. Thats why we add one in below. Its done for case the two halves
367       // are ever merged back together --rare.  Without it, on open of store,
368       // since store files are distinguished by sequence id, the one half would
369       // subsume the other.
370       this.sequenceid = Bytes.toLong(b);
371       if (fileInfo.isTopReference()) {
372         this.sequenceid += 1;
373       }
374     }
375 
376     if (isBulkLoadResult()){
377       // generate the sequenceId from the fileName
378       // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
379       String fileName = this.getPath().getName();
380       int startPos = fileName.indexOf("SeqId_");
381       if (startPos != -1) {
382         this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
383             fileName.indexOf('_', startPos + 6)));
384         // Handle reference files as done above.
385         if (fileInfo.isTopReference()) {
386           this.sequenceid += 1;
387         }
388       }
389     }
390     this.reader.setSequenceID(this.sequenceid);
391 
392     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
393     if (b != null) {
394       this.maxMemstoreTS = Bytes.toLong(b);
395     }
396 
397     b = metadataMap.get(MAJOR_COMPACTION_KEY);
398     if (b != null) {
399       boolean mc = Bytes.toBoolean(b);
400       if (this.majorCompaction == null) {
401         this.majorCompaction = new AtomicBoolean(mc);
402       } else {
403         this.majorCompaction.set(mc);
404       }
405     } else {
406       // Presume it is not major compacted if it doesn't explicity say so
407       // HFileOutputFormat explicitly sets the major compacted key.
408       this.majorCompaction = new AtomicBoolean(false);
409     }
410 
411     b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
412     this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
413 
414     BloomType hfileBloomType = reader.getBloomFilterType();
415     if (cfBloomType != BloomType.NONE) {
416       reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
417       if (hfileBloomType != cfBloomType) {
418         LOG.info("HFile Bloom filter type for "
419             + reader.getHFileReader().getName() + ": " + hfileBloomType
420             + ", but " + cfBloomType + " specified in column family "
421             + "configuration");
422       }
423     } else if (hfileBloomType != BloomType.NONE) {
424       LOG.info("Bloom filter turned off by CF config for "
425           + reader.getHFileReader().getName());
426     }
427 
428     // load delete family bloom filter
429     reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
430 
431     try {
432       byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
433       if (timerangeBytes != null) {
434         this.reader.timeRangeTracker = new TimeRangeTracker();
435         Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
436       }
437     } catch (IllegalArgumentException e) {
438       LOG.error("Error reading timestamp range data from meta -- " +
439           "proceeding without", e);
440       this.reader.timeRangeTracker = null;
441     }
442     return this.reader;
443   }
444 
445   /**
446    * @return Reader for StoreFile. creates if necessary
447    * @throws IOException
448    */
449   public Reader createReader() throws IOException {
450     if (this.reader == null) {
451       try {
452         this.reader = open();
453       } catch (IOException e) {
454         try {
455           this.closeReader(true);
456         } catch (IOException ee) {
457         }
458         throw e;
459       }
460 
461     }
462     return this.reader;
463   }
464 
465   /**
466    * @return Current reader.  Must call createReader first else returns null.
467    * @see #createReader()
468    */
469   public Reader getReader() {
470     return this.reader;
471   }
472 
473   /**
474    * @param evictOnClose whether to evict blocks belonging to this file
475    * @throws IOException
476    */
477   public synchronized void closeReader(boolean evictOnClose)
478       throws IOException {
479     if (this.reader != null) {
480       this.reader.close(evictOnClose);
481       this.reader = null;
482     }
483   }
484 
485   /**
486    * Delete this file
487    * @throws IOException
488    */
489   public void deleteReader() throws IOException {
490     closeReader(true);
491     this.fs.delete(getPath(), true);
492   }
493 
494   @Override
495   public String toString() {
496     return this.fileInfo.toString();
497   }
498 
499   /**
500    * @return a length description of this StoreFile, suitable for debug output
501    */
502   public String toStringDetailed() {
503     StringBuilder sb = new StringBuilder();
504     sb.append(this.getPath().toString());
505     sb.append(", isReference=").append(isReference());
506     sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
507     if (isBulkLoadResult()) {
508       sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
509     } else {
510       sb.append(", seqid=").append(getMaxSequenceId());
511     }
512     sb.append(", majorCompaction=").append(isMajorCompaction());
513 
514     return sb.toString();
515   }
516 
517   public static class WriterBuilder {
518     private final Configuration conf;
519     private final CacheConfig cacheConf;
520     private final FileSystem fs;
521     private final int blockSize;
522 
523     private Compression.Algorithm compressAlgo =
524         HFile.DEFAULT_COMPRESSION_ALGORITHM;
525     private HFileDataBlockEncoder dataBlockEncoder =
526         NoOpDataBlockEncoder.INSTANCE;
527     private KeyValue.KVComparator comparator = KeyValue.COMPARATOR;
528     private BloomType bloomType = BloomType.NONE;
529     private long maxKeyCount = 0;
530     private Path dir;
531     private Path filePath;
532     private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
533     private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM;
534     private boolean includeMVCCReadpoint = true;
535 
536     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
537         FileSystem fs, int blockSize) {
538       this.conf = conf;
539       this.cacheConf = cacheConf;
540       this.fs = fs;
541       this.blockSize = blockSize;
542     }
543 
544     /**
545      * Use either this method or {@link #withFilePath}, but not both.
546      * @param dir Path to column family directory. The directory is created if
547      *          does not exist. The file is given a unique name within this
548      *          directory.
549      * @return this (for chained invocation)
550      */
551     public WriterBuilder withOutputDir(Path dir) {
552       Preconditions.checkNotNull(dir);
553       this.dir = dir;
554       return this;
555     }
556 
557     /**
558      * Use either this method or {@link #withOutputDir}, but not both.
559      * @param filePath the StoreFile path to write
560      * @return this (for chained invocation)
561      */
562     public WriterBuilder withFilePath(Path filePath) {
563       Preconditions.checkNotNull(filePath);
564       this.filePath = filePath;
565       return this;
566     }
567 
568     public WriterBuilder withCompression(Compression.Algorithm compressAlgo) {
569       Preconditions.checkNotNull(compressAlgo);
570       this.compressAlgo = compressAlgo;
571       return this;
572     }
573 
574     public WriterBuilder withDataBlockEncoder(HFileDataBlockEncoder encoder) {
575       Preconditions.checkNotNull(encoder);
576       this.dataBlockEncoder = encoder;
577       return this;
578     }
579 
580     public WriterBuilder withComparator(KeyValue.KVComparator comparator) {
581       Preconditions.checkNotNull(comparator);
582       this.comparator = comparator;
583       return this;
584     }
585 
586     public WriterBuilder withBloomType(BloomType bloomType) {
587       Preconditions.checkNotNull(bloomType);
588       this.bloomType = bloomType;
589       return this;
590     }
591 
592     /**
593      * @param maxKeyCount estimated maximum number of keys we expect to add
594      * @return this (for chained invocation)
595      */
596     public WriterBuilder withMaxKeyCount(long maxKeyCount) {
597       this.maxKeyCount = maxKeyCount;
598       return this;
599     }
600 
601     /**
602      * @param checksumType the type of checksum
603      * @return this (for chained invocation)
604      */
605     public WriterBuilder withChecksumType(ChecksumType checksumType) {
606       this.checksumType = checksumType;
607       return this;
608     }
609 
610     /**
611      * @param bytesPerChecksum the number of bytes per checksum chunk
612      * @return this (for chained invocation)
613      */
614     public WriterBuilder withBytesPerChecksum(int bytesPerChecksum) {
615       this.bytesPerChecksum = bytesPerChecksum;
616       return this;
617     }
618 
619     /**
620      * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV
621      * @return this (for chained invocation)
622      */
623     public WriterBuilder includeMVCCReadpoint(boolean includeMVCCReadpoint) {
624       this.includeMVCCReadpoint = includeMVCCReadpoint;
625       return this;
626     }
627 
628     /**
629      * Create a store file writer. Client is responsible for closing file when
630      * done. If metadata, add BEFORE closing using
631      * {@link Writer#appendMetadata}.
632      */
633     public Writer build() throws IOException {
634       if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
635         throw new IllegalArgumentException("Either specify parent directory " +
636             "or file path");
637       }
638 
639       if (dir == null) {
640         dir = filePath.getParent();
641       }
642 
643       if (!fs.exists(dir)) {
644         fs.mkdirs(dir);
645       }
646 
647       if (filePath == null) {
648         filePath = getUniqueFile(fs, dir);
649         if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
650           bloomType = BloomType.NONE;
651         }
652       }
653 
654       if (compressAlgo == null) {
655         compressAlgo = HFile.DEFAULT_COMPRESSION_ALGORITHM;
656       }
657       if (comparator == null) {
658         comparator = KeyValue.COMPARATOR;
659       }
660       return new Writer(fs, filePath, blockSize, compressAlgo, dataBlockEncoder,
661           conf, cacheConf, comparator, bloomType, maxKeyCount, checksumType,
662           bytesPerChecksum, includeMVCCReadpoint);
663     }
664   }
665 
666   /**
667    * @param fs
668    * @param dir Directory to create file in.
669    * @return random filename inside passed <code>dir</code>
670    */
671   public static Path getUniqueFile(final FileSystem fs, final Path dir)
672       throws IOException {
673     if (!fs.getFileStatus(dir).isDir()) {
674       throw new IOException("Expecting " + dir.toString() +
675         " to be a directory");
676     }
677     return new Path(dir, UUID.randomUUID().toString().replaceAll("-", ""));
678   }
679 
680   public Long getMinimumTimestamp() {
681     return (getReader().timeRangeTracker == null) ?
682         null :
683         getReader().timeRangeTracker.minimumTimestamp;
684   }
685 
686   /**
687    * Gets the approximate mid-point of this file that is optimal for use in splitting it.
688    * @param comparator Comparator used to compare KVs.
689    * @return The split point row, or null if splitting is not possible, or reader is null.
690    */
691   byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
692     if (this.reader == null) {
693       LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
694       return null;
695     }
696     // Get first, last, and mid keys.  Midkey is the key that starts block
697     // in middle of hfile.  Has column and timestamp.  Need to return just
698     // the row we want to split on as midkey.
699     byte [] midkey = this.reader.midkey();
700     if (midkey != null) {
701       KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
702       byte [] fk = this.reader.getFirstKey();
703       KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
704       byte [] lk = this.reader.getLastKey();
705       KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
706       // if the midkey is the same as the first or last keys, we cannot (ever) split this region.
707       if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
708         if (LOG.isDebugEnabled()) {
709           LOG.debug("cannot split because midkey is the same as first or last row");
710         }
711         return null;
712       }
713       return mk.getRow();
714     }
715     return null;
716   }
717 
718   /**
719    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
720    * local because it is an implementation detail of the HBase regionserver.
721    */
722   public static class Writer implements Compactor.CellSink {
723     private final BloomFilterWriter generalBloomFilterWriter;
724     private final BloomFilterWriter deleteFamilyBloomFilterWriter;
725     private final BloomType bloomType;
726     private byte[] lastBloomKey;
727     private int lastBloomKeyOffset, lastBloomKeyLen;
728     private KVComparator kvComparator;
729     private KeyValue lastKv = null;
730     private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
731     private KeyValue lastDeleteFamilyKV = null;
732     private long deleteFamilyCnt = 0;
733 
734     protected HFileDataBlockEncoder dataBlockEncoder;
735 
736     /** Checksum type */
737     protected ChecksumType checksumType;
738 
739     /** Bytes per Checksum */
740     protected int bytesPerChecksum;
741     
742     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
743     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
744      * When flushing a memstore, we set TimeRange and use this variable to
745      * indicate that it doesn't need to be calculated again while
746      * appending KeyValues.
747      * It is not set in cases of compactions when it is recalculated using only
748      * the appended KeyValues*/
749     boolean isTimeRangeTrackerSet = false;
750 
751     protected HFile.Writer writer;
752 
753     /**
754      * Creates an HFile.Writer that also write helpful meta data.
755      * @param fs file system to write to
756      * @param path file name to create
757      * @param blocksize HDFS block size
758      * @param compress HDFS block compression
759      * @param conf user configuration
760      * @param comparator key comparator
761      * @param bloomType bloom filter setting
762      * @param maxKeys the expected maximum number of keys to be added. Was used
763      *        for Bloom filter size in {@link HFile} format version 1.
764      * @param checksumType the checksum type
765      * @param bytesPerChecksum the number of bytes per checksum value
766      * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV
767      * @throws IOException problem writing to FS
768      */
769     private Writer(FileSystem fs, Path path, int blocksize,
770         Compression.Algorithm compress,
771         HFileDataBlockEncoder dataBlockEncoder, final Configuration conf,
772         CacheConfig cacheConf,
773         final KVComparator comparator, BloomType bloomType, long maxKeys,
774         final ChecksumType checksumType, final int bytesPerChecksum,
775         final boolean includeMVCCReadpoint) throws IOException {
776       this.dataBlockEncoder = dataBlockEncoder != null ?
777           dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
778       writer = HFile.getWriterFactory(conf, cacheConf)
779           .withPath(fs, path)
780           .withBlockSize(blocksize)
781           .withCompression(compress)
782           .withDataBlockEncoder(this.dataBlockEncoder)
783           .withComparator(comparator.getRawComparator())
784           .withChecksumType(checksumType)
785           .withBytesPerChecksum(bytesPerChecksum)
786           .includeMVCCReadpoint(includeMVCCReadpoint)
787           .create();
788 
789       this.kvComparator = comparator;
790 
791       generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
792           conf, cacheConf, bloomType,
793           (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
794 
795       if (generalBloomFilterWriter != null) {
796         this.bloomType = bloomType;
797         LOG.info("Bloom filter type for " + path + ": " + this.bloomType + ", "
798             + generalBloomFilterWriter.getClass().getSimpleName());
799       } else {
800         // Not using Bloom filters.
801         this.bloomType = BloomType.NONE;
802       }
803 
804       // initialize delete family Bloom filter when there is NO RowCol Bloom
805       // filter
806       if (this.bloomType != BloomType.ROWCOL) {
807         this.deleteFamilyBloomFilterWriter = BloomFilterFactory
808             .createDeleteBloomAtWrite(conf, cacheConf,
809                 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
810       } else {
811         deleteFamilyBloomFilterWriter = null;
812       }
813       if (deleteFamilyBloomFilterWriter != null) {
814         LOG.info("Delete Family Bloom filter type for " + path + ": "
815             + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
816       }
817       this.checksumType = checksumType;
818       this.bytesPerChecksum = bytesPerChecksum;
819     }
820 
821     /**
822      * Writes meta data.
823      * Call before {@link #close()} since its written as meta data to this file.
824      * @param maxSequenceId Maximum sequence id.
825      * @param majorCompaction True if this file is product of a major compaction
826      * @throws IOException problem writing to FS
827      */
828     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
829     throws IOException {
830       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
831       writer.appendFileInfo(MAJOR_COMPACTION_KEY,
832           Bytes.toBytes(majorCompaction));
833       appendTrackedTimestampsToMetadata();
834     }
835 
836     /**
837      * Add TimestampRange and earliest put timestamp to Metadata
838      */
839     public void appendTrackedTimestampsToMetadata() throws IOException {
840       appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
841       appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
842     }
843 
844     /**
845      * Set TimeRangeTracker
846      * @param trt
847      */
848     public void setTimeRangeTracker(final TimeRangeTracker trt) {
849       this.timeRangeTracker = trt;
850       isTimeRangeTrackerSet = true;
851     }
852 
853     /**
854      * Record the earlest Put timestamp.
855      *
856      * If the timeRangeTracker is not set,
857      * update TimeRangeTracker to include the timestamp of this key
858      * @param kv
859      */
860     public void trackTimestamps(final KeyValue kv) {
861       if (KeyValue.Type.Put.getCode() == kv.getType()) {
862         earliestPutTs = Math.min(earliestPutTs, kv.getTimestamp());
863       }
864       if (!isTimeRangeTrackerSet) {
865         timeRangeTracker.includeTimestamp(kv);
866       }
867     }
868 
869     private void appendGeneralBloomfilter(final KeyValue kv) throws IOException {
870       if (this.generalBloomFilterWriter != null) {
871         // only add to the bloom filter on a new, unique key
872         boolean newKey = true;
873         if (this.lastKv != null) {
874           switch(bloomType) {
875           case ROW:
876             newKey = ! kvComparator.matchingRows(kv, lastKv);
877             break;
878           case ROWCOL:
879             newKey = ! kvComparator.matchingRowColumn(kv, lastKv);
880             break;
881           case NONE:
882             newKey = false;
883             break;
884           default:
885             throw new IOException("Invalid Bloom filter type: " + bloomType +
886                 " (ROW or ROWCOL expected)");
887           }
888         }
889         if (newKey) {
890           /*
891            * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
892            * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
893            *
894            * 2 Types of Filtering:
895            *  1. Row = Row
896            *  2. RowCol = Row + Qualifier
897            */
898           byte[] bloomKey;
899           int bloomKeyOffset, bloomKeyLen;
900 
901           switch (bloomType) {
902           case ROW:
903             bloomKey = kv.getBuffer();
904             bloomKeyOffset = kv.getRowOffset();
905             bloomKeyLen = kv.getRowLength();
906             break;
907           case ROWCOL:
908             // merge(row, qualifier)
909             // TODO: could save one buffer copy in case of compound Bloom
910             // filters when this involves creating a KeyValue
911             bloomKey = generalBloomFilterWriter.createBloomKey(kv.getBuffer(),
912                 kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
913                 kv.getQualifierOffset(), kv.getQualifierLength());
914             bloomKeyOffset = 0;
915             bloomKeyLen = bloomKey.length;
916             break;
917           default:
918             throw new IOException("Invalid Bloom filter type: " + bloomType +
919                 " (ROW or ROWCOL expected)");
920           }
921           generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
922           if (lastBloomKey != null
923               && generalBloomFilterWriter.getComparator().compare(bloomKey,
924                   bloomKeyOffset, bloomKeyLen, lastBloomKey,
925                   lastBloomKeyOffset, lastBloomKeyLen) <= 0) {
926             throw new IOException("Non-increasing Bloom keys: "
927                 + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen)
928                 + " after "
929                 + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset,
930                     lastBloomKeyLen));
931           }
932           lastBloomKey = bloomKey;
933           lastBloomKeyOffset = bloomKeyOffset;
934           lastBloomKeyLen = bloomKeyLen;
935           this.lastKv = kv;
936         }
937       }
938     }
939 
940     private void appendDeleteFamilyBloomFilter(final KeyValue kv)
941         throws IOException {
942       if (!kv.isDeleteFamily()) {
943         return;
944       }
945 
946       // increase the number of delete family in the store file
947       deleteFamilyCnt++;
948       if (null != this.deleteFamilyBloomFilterWriter) {
949         boolean newKey = true;
950         if (lastDeleteFamilyKV != null) {
951           newKey = !kvComparator.matchingRows(kv, lastDeleteFamilyKV);
952         }
953         if (newKey) {
954           this.deleteFamilyBloomFilterWriter.add(kv.getBuffer(),
955               kv.getRowOffset(), kv.getRowLength());
956           this.lastDeleteFamilyKV = kv;
957         }
958       }
959     }
960 
961     public void append(final KeyValue kv) throws IOException {
962       appendGeneralBloomfilter(kv);
963       appendDeleteFamilyBloomFilter(kv);
964       writer.append(kv);
965       trackTimestamps(kv);
966     }
967 
968     public Path getPath() {
969       return this.writer.getPath();
970     }
971 
972     boolean hasGeneralBloom() {
973       return this.generalBloomFilterWriter != null;
974     }
975 
976     /**
977      * For unit testing only.
978      *
979      * @return the Bloom filter used by this writer.
980      */
981     BloomFilterWriter getGeneralBloomWriter() {
982       return generalBloomFilterWriter;
983     }
984 
985     private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
986       boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
987       if (haveBloom) {
988         bfw.compactBloom();
989       }
990       return haveBloom;
991     }
992 
993     private boolean closeGeneralBloomFilter() throws IOException {
994       boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
995 
996       // add the general Bloom filter writer and append file info
997       if (hasGeneralBloom) {
998         writer.addGeneralBloomFilter(generalBloomFilterWriter);
999         writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY,
1000             Bytes.toBytes(bloomType.toString()));
1001         if (lastBloomKey != null) {
1002           writer.appendFileInfo(LAST_BLOOM_KEY, Arrays.copyOfRange(
1003               lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
1004                   + lastBloomKeyLen));
1005         }
1006       }
1007       return hasGeneralBloom;
1008     }
1009 
1010     private boolean closeDeleteFamilyBloomFilter() throws IOException {
1011       boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
1012 
1013       // add the delete family Bloom filter writer
1014       if (hasDeleteFamilyBloom) {
1015         writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
1016       }
1017 
1018       // append file info about the number of delete family kvs
1019       // even if there is no delete family Bloom.
1020       writer.appendFileInfo(DELETE_FAMILY_COUNT,
1021           Bytes.toBytes(this.deleteFamilyCnt));
1022 
1023       return hasDeleteFamilyBloom;
1024     }
1025 
1026     public void close() throws IOException {
1027       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
1028       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
1029 
1030       writer.close();
1031 
1032       // Log final Bloom filter statistics. This needs to be done after close()
1033       // because compound Bloom filters might be finalized as part of closing.
1034       StoreFile.LOG.info((hasGeneralBloom ? "" : "NO ") + "General Bloom and "
1035           + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily"
1036           + " was added to HFile (" + getPath() + ") ");
1037 
1038     }
1039 
1040     public void appendFileInfo(byte[] key, byte[] value) throws IOException {
1041       writer.appendFileInfo(key, value);
1042     }
1043 
1044     /** For use in testing, e.g. {@link org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile}
1045      */
1046     HFile.Writer getHFileWriter() {
1047       return writer;
1048     }
1049   }
1050 
1051   /**
1052    * Reader for a StoreFile.
1053    */
1054   public static class Reader {
1055     static final Log LOG = LogFactory.getLog(Reader.class.getName());
1056 
1057     protected BloomFilter generalBloomFilter = null;
1058     protected BloomFilter deleteFamilyBloomFilter = null;
1059     protected BloomType bloomFilterType;
1060     private final HFile.Reader reader;
1061     protected TimeRangeTracker timeRangeTracker = null;
1062     protected long sequenceID = -1;
1063     private byte[] lastBloomKey;
1064     private long deleteFamilyCnt = -1;
1065 
1066     public Reader(FileSystem fs, Path path, CacheConfig cacheConf,
1067         DataBlockEncoding preferredEncodingInCache) throws IOException {
1068       reader = HFile.createReaderWithEncoding(fs, path, cacheConf,
1069           preferredEncodingInCache);
1070       bloomFilterType = BloomType.NONE;
1071     }
1072 
1073     public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
1074         CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache) throws IOException {
1075       reader = HFile.createReaderWithEncoding(
1076           fs, path, in, size, cacheConf, preferredEncodingInCache);
1077       bloomFilterType = BloomType.NONE;
1078     }
1079 
1080     /**
1081      * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
1082      */
1083     Reader() {
1084       this.reader = null;
1085     }
1086 
1087     public RawComparator<byte []> getComparator() {
1088       return reader.getComparator();
1089     }
1090 
1091     /**
1092      * Get a scanner to scan over this StoreFile. Do not use
1093      * this overload if using this scanner for compactions.
1094      *
1095      * @param cacheBlocks should this scanner cache blocks?
1096      * @param pread use pread (for highly concurrent small readers)
1097      * @return a scanner
1098      */
1099     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1100                                                boolean pread) {
1101       return getStoreFileScanner(cacheBlocks, pread, false);
1102     }
1103 
1104     /**
1105      * Get a scanner to scan over this StoreFile.
1106      *
1107      * @param cacheBlocks should this scanner cache blocks?
1108      * @param pread use pread (for highly concurrent small readers)
1109      * @param isCompaction is scanner being used for compaction?
1110      * @return a scanner
1111      */
1112     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1113                                                boolean pread,
1114                                                boolean isCompaction) {
1115       return new StoreFileScanner(this,
1116                                  getScanner(cacheBlocks, pread,
1117                                             isCompaction), !isCompaction);
1118     }
1119 
1120     /**
1121      * Warning: Do not write further code which depends on this call. Instead
1122      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1123      * which is the preferred way to scan a store with higher level concepts.
1124      *
1125      * @param cacheBlocks should we cache the blocks?
1126      * @param pread use pread (for concurrent small readers)
1127      * @return the underlying HFileScanner
1128      */
1129     @Deprecated
1130     public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
1131       return getScanner(cacheBlocks, pread, false);
1132     }
1133 
1134     /**
1135      * Warning: Do not write further code which depends on this call. Instead
1136      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1137      * which is the preferred way to scan a store with higher level concepts.
1138      *
1139      * @param cacheBlocks
1140      *          should we cache the blocks?
1141      * @param pread
1142      *          use pread (for concurrent small readers)
1143      * @param isCompaction
1144      *          is scanner being used for compaction?
1145      * @return the underlying HFileScanner
1146      */
1147     @Deprecated
1148     public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
1149         boolean isCompaction) {
1150       return reader.getScanner(cacheBlocks, pread, isCompaction);
1151     }
1152 
1153     public void close(boolean evictOnClose) throws IOException {
1154       reader.close(evictOnClose);
1155     }
1156 
1157     /**
1158      * Check if this storeFile may contain keys within the TimeRange that
1159      * have not expired (i.e. not older than oldestUnexpiredTS).
1160      * @param scan the current scan
1161      * @param oldestUnexpiredTS the oldest timestamp that is not expired, as
1162      *          determined by the column family's TTL
1163      * @return false if queried keys definitely don't exist in this StoreFile
1164      */
1165     boolean passesTimerangeFilter(Scan scan, long oldestUnexpiredTS) {
1166       if (timeRangeTracker == null) {
1167         return true;
1168       } else {
1169         return timeRangeTracker.includesTimeRange(scan.getTimeRange()) &&
1170             timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS;
1171       }
1172     }
1173 
1174     /**
1175      * Checks whether the given scan passes the Bloom filter (if present). Only
1176      * checks Bloom filters for single-row or single-row-column scans. Bloom
1177      * filter checking for multi-gets is implemented as part of the store
1178      * scanner system (see {@link StoreFileScanner#seekExactly}) and uses
1179      * the lower-level API {@link #passesGeneralBloomFilter(byte[], int, int, byte[],
1180      * int, int)}.
1181      *
1182      * @param scan the scan specification. Used to determine the row, and to
1183      *          check whether this is a single-row ("get") scan.
1184      * @param columns the set of columns. Only used for row-column Bloom
1185      *          filters.
1186      * @return true if the scan with the given column set passes the Bloom
1187      *         filter, or if the Bloom filter is not applicable for the scan.
1188      *         False if the Bloom filter is applicable and the scan fails it.
1189      */
1190      boolean passesBloomFilter(Scan scan,
1191         final SortedSet<byte[]> columns) {
1192       // Multi-column non-get scans will use Bloom filters through the
1193       // lower-level API function that this function calls.
1194       if (!scan.isGetScan()) {
1195         return true;
1196       }
1197 
1198       byte[] row = scan.getStartRow();
1199       switch (this.bloomFilterType) {
1200         case ROW:
1201           return passesGeneralBloomFilter(row, 0, row.length, null, 0, 0);
1202 
1203         case ROWCOL:
1204           if (columns != null && columns.size() == 1) {
1205             byte[] column = columns.first();
1206             return passesGeneralBloomFilter(row, 0, row.length, column, 0,
1207                 column.length);
1208           }
1209 
1210           // For multi-column queries the Bloom filter is checked from the
1211           // seekExact operation.
1212           return true;
1213 
1214         default:
1215           return true;
1216       }
1217     }
1218 
1219     public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
1220         int rowLen) {
1221       // Cache Bloom filter as a local variable in case it is set to null by
1222       // another thread on an IO error.
1223       BloomFilter bloomFilter = this.deleteFamilyBloomFilter;
1224 
1225       // Empty file or there is no delete family at all
1226       if (reader.getTrailer().getEntryCount() == 0 || deleteFamilyCnt == 0) {
1227         return false;
1228       }
1229 
1230       if (bloomFilter == null) {
1231         return true;
1232       }
1233 
1234       try {
1235         if (!bloomFilter.supportsAutoLoading()) {
1236           return true;
1237         }
1238         return bloomFilter.contains(row, rowOffset, rowLen, null);
1239       } catch (IllegalArgumentException e) {
1240         LOG.error("Bad Delete Family bloom filter data -- proceeding without",
1241             e);
1242         setDeleteFamilyBloomFilterFaulty();
1243       }
1244 
1245       return true;
1246     }
1247 
1248     /**
1249      * A method for checking Bloom filters. Called directly from
1250      * StoreFileScanner in case of a multi-column query.
1251      *
1252      * @param row
1253      * @param rowOffset
1254      * @param rowLen
1255      * @param col
1256      * @param colOffset
1257      * @param colLen
1258      * @return True if passes
1259      */
1260     public boolean passesGeneralBloomFilter(byte[] row, int rowOffset,
1261         int rowLen, byte[] col, int colOffset, int colLen) {
1262       // Cache Bloom filter as a local variable in case it is set to null by
1263       // another thread on an IO error.
1264       BloomFilter bloomFilter = this.generalBloomFilter;
1265       if (bloomFilter == null) {
1266         return true;
1267       }
1268 
1269       byte[] key;
1270       switch (bloomFilterType) {
1271         case ROW:
1272           if (col != null) {
1273             throw new RuntimeException("Row-only Bloom filter called with " +
1274                 "column specified");
1275           }
1276           if (rowOffset != 0 || rowLen != row.length) {
1277               throw new AssertionError("For row-only Bloom filters the row "
1278                   + "must occupy the whole array");
1279           }
1280           key = row;
1281           break;
1282 
1283         case ROWCOL:
1284           key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
1285               colOffset, colLen);
1286           break;
1287 
1288         default:
1289           return true;
1290       }
1291 
1292       // Empty file
1293       if (reader.getTrailer().getEntryCount() == 0)
1294         return false;
1295 
1296       try {
1297         boolean shouldCheckBloom;
1298         ByteBuffer bloom;
1299         if (bloomFilter.supportsAutoLoading()) {
1300           bloom = null;
1301           shouldCheckBloom = true;
1302         } else {
1303           bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY,
1304               true);
1305           shouldCheckBloom = bloom != null;
1306         }
1307 
1308         if (shouldCheckBloom) {
1309           boolean exists;
1310 
1311           // Whether the primary Bloom key is greater than the last Bloom key
1312           // from the file info. For row-column Bloom filters this is not yet
1313           // a sufficient condition to return false.
1314           boolean keyIsAfterLast = lastBloomKey != null
1315               && bloomFilter.getComparator().compare(key, lastBloomKey) > 0;
1316 
1317           if (bloomFilterType == BloomType.ROWCOL) {
1318             // Since a Row Delete is essentially a DeleteFamily applied to all
1319             // columns, a file might be skipped if using row+col Bloom filter.
1320             // In order to ensure this file is included an additional check is
1321             // required looking only for a row bloom.
1322             byte[] rowBloomKey = bloomFilter.createBloomKey(row, 0, row.length,
1323                 null, 0, 0);
1324 
1325             if (keyIsAfterLast
1326                 && bloomFilter.getComparator().compare(rowBloomKey,
1327                     lastBloomKey) > 0) {
1328               exists = false;
1329             } else {
1330               exists =
1331                   bloomFilter.contains(key, 0, key.length, bloom) ||
1332                   bloomFilter.contains(rowBloomKey, 0, rowBloomKey.length,
1333                       bloom);
1334             }
1335           } else {
1336             exists = !keyIsAfterLast
1337                 && bloomFilter.contains(key, 0, key.length, bloom);
1338           }
1339 
1340           return exists;
1341         }
1342       } catch (IOException e) {
1343         LOG.error("Error reading bloom filter data -- proceeding without",
1344             e);
1345         setGeneralBloomFilterFaulty();
1346       } catch (IllegalArgumentException e) {
1347         LOG.error("Bad bloom filter data -- proceeding without", e);
1348         setGeneralBloomFilterFaulty();
1349       }
1350 
1351       return true;
1352     }
1353 
1354     /**
1355      * Checks whether the given scan rowkey range overlaps with the current storefile's
1356      * @param scan the scan specification. Used to determine the rowkey range.
1357      * @return true if there is overlap, false otherwise
1358      */
1359     public boolean passesKeyRangeFilter(Scan scan) {
1360       if (this.getFirstKey() == null || this.getLastKey() == null) {
1361         // the file is empty
1362         return false;
1363       }
1364       if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
1365           && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
1366         return true;
1367       }
1368       KeyValue startKeyValue = KeyValue.createFirstOnRow(scan.getStartRow());
1369       KeyValue stopKeyValue = KeyValue.createLastOnRow(scan.getStopRow());
1370       boolean nonOverLapping = (getComparator().compare(this.getFirstKey(),
1371         stopKeyValue.getKey()) > 0 && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1372           || getComparator().compare(this.getLastKey(), startKeyValue.getKey()) < 0;
1373       return !nonOverLapping;
1374     }
1375 
1376     public Map<byte[], byte[]> loadFileInfo() throws IOException {
1377       Map<byte [], byte []> fi = reader.loadFileInfo();
1378 
1379       byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
1380       if (b != null) {
1381         bloomFilterType = BloomType.valueOf(Bytes.toString(b));
1382       }
1383 
1384       lastBloomKey = fi.get(LAST_BLOOM_KEY);
1385       byte[] cnt = fi.get(DELETE_FAMILY_COUNT);
1386       if (cnt != null) {
1387         deleteFamilyCnt = Bytes.toLong(cnt);
1388       }
1389 
1390       return fi;
1391     }
1392 
1393     public void loadBloomfilter() {
1394       this.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
1395       this.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
1396     }
1397 
1398     private void loadBloomfilter(BlockType blockType) {
1399       try {
1400         if (blockType == BlockType.GENERAL_BLOOM_META) {
1401           if (this.generalBloomFilter != null)
1402             return; // Bloom has been loaded
1403 
1404           DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
1405           if (bloomMeta != null) {
1406             // sanity check for NONE Bloom filter
1407             if (bloomFilterType == BloomType.NONE) {
1408               throw new IOException(
1409                   "valid bloom filter type not found in FileInfo");
1410             } else {
1411               generalBloomFilter = BloomFilterFactory.createFromMeta(bloomMeta,
1412                   reader);
1413               LOG.info("Loaded " + bloomFilterType.toString() + " ("
1414                   + generalBloomFilter.getClass().getSimpleName()
1415                   + ") metadata for " + reader.getName());
1416             }
1417           }
1418         } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1419           if (this.deleteFamilyBloomFilter != null)
1420             return; // Bloom has been loaded
1421 
1422           DataInput bloomMeta = reader.getDeleteBloomFilterMetadata();
1423           if (bloomMeta != null) {
1424             deleteFamilyBloomFilter = BloomFilterFactory.createFromMeta(
1425                 bloomMeta, reader);
1426             LOG.info("Loaded Delete Family Bloom ("
1427                 + deleteFamilyBloomFilter.getClass().getSimpleName()
1428                 + ") metadata for " + reader.getName());
1429           }
1430         } else {
1431           throw new RuntimeException("Block Type: " + blockType.toString()
1432               + "is not supported for Bloom filter");
1433         }
1434       } catch (IOException e) {
1435         LOG.error("Error reading bloom filter meta for " + blockType
1436             + " -- proceeding without", e);
1437         setBloomFilterFaulty(blockType);
1438       } catch (IllegalArgumentException e) {
1439         LOG.error("Bad bloom filter meta " + blockType
1440             + " -- proceeding without", e);
1441         setBloomFilterFaulty(blockType);
1442       }
1443     }
1444 
1445     private void setBloomFilterFaulty(BlockType blockType) {
1446       if (blockType == BlockType.GENERAL_BLOOM_META) {
1447         setGeneralBloomFilterFaulty();
1448       } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1449         setDeleteFamilyBloomFilterFaulty();
1450       }
1451     }
1452 
1453     /**
1454      * The number of Bloom filter entries in this store file, or an estimate
1455      * thereof, if the Bloom filter is not loaded. This always returns an upper
1456      * bound of the number of Bloom filter entries.
1457      *
1458      * @return an estimate of the number of Bloom filter entries in this file
1459      */
1460     public long getFilterEntries() {
1461       return generalBloomFilter != null ? generalBloomFilter.getKeyCount()
1462           : reader.getEntries();
1463     }
1464 
1465     public void setGeneralBloomFilterFaulty() {
1466       generalBloomFilter = null;
1467     }
1468 
1469     public void setDeleteFamilyBloomFilterFaulty() {
1470       this.deleteFamilyBloomFilter = null;
1471     }
1472 
1473     public byte[] getLastKey() {
1474       return reader.getLastKey();
1475     }
1476 
1477     public byte[] midkey() throws IOException {
1478       return reader.midkey();
1479     }
1480 
1481     public long length() {
1482       return reader.length();
1483     }
1484 
1485     public long getTotalUncompressedBytes() {
1486       return reader.getTrailer().getTotalUncompressedBytes();
1487     }
1488 
1489     public long getEntries() {
1490       return reader.getEntries();
1491     }
1492 
1493     public long getDeleteFamilyCnt() {
1494       return deleteFamilyCnt;
1495     }
1496 
1497     public byte[] getFirstKey() {
1498       return reader.getFirstKey();
1499     }
1500 
1501     public long indexSize() {
1502       return reader.indexSize();
1503     }
1504 
1505     public BloomType getBloomFilterType() {
1506       return this.bloomFilterType;
1507     }
1508 
1509     public long getSequenceID() {
1510       return sequenceID;
1511     }
1512 
1513     public void setSequenceID(long sequenceID) {
1514       this.sequenceID = sequenceID;
1515     }
1516 
1517     BloomFilter getGeneralBloomFilter() {
1518       return generalBloomFilter;
1519     }
1520 
1521     long getUncompressedDataIndexSize() {
1522       return reader.getTrailer().getUncompressedDataIndexSize();
1523     }
1524 
1525     public long getTotalBloomSize() {
1526       if (generalBloomFilter == null)
1527         return 0;
1528       return generalBloomFilter.getByteSize();
1529     }
1530 
1531     public int getHFileVersion() {
1532       return reader.getTrailer().getMajorVersion();
1533     }
1534 
1535     public int getHFileMinorVersion() {
1536       return reader.getTrailer().getMinorVersion();
1537     }
1538 
1539     public HFile.Reader getHFileReader() {
1540       return reader;
1541     }
1542 
1543     void disableBloomFilterForTesting() {
1544       generalBloomFilter = null;
1545       this.deleteFamilyBloomFilter = null;
1546     }
1547 
1548     public long getMaxTimestamp() {
1549       return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.maximumTimestamp;
1550     }
1551   }
1552 
1553   /**
1554    * Useful comparators for comparing StoreFiles.
1555    */
1556   public abstract static class Comparators {
1557     /**
1558      * Comparator that compares based on the Sequence Ids of the
1559      * the StoreFiles. Bulk loads that did not request a seq ID
1560      * are given a seq id of -1; thus, they are placed before all non-
1561      * bulk loads, and bulk loads with sequence Id. Among these files,
1562      * the size is used to determine the ordering, then bulkLoadTime.
1563      * If there are ties, the path name is used as a tie-breaker.
1564      */
1565     public static final Comparator<StoreFile> SEQ_ID =
1566       Ordering.compound(ImmutableList.of(
1567           Ordering.natural().onResultOf(new GetSeqId()),
1568           Ordering.natural().onResultOf(new GetFileSize()).reverse(),
1569           Ordering.natural().onResultOf(new GetBulkTime()),
1570           Ordering.natural().onResultOf(new GetPathName())
1571       ));
1572 
1573     private static class GetSeqId implements Function<StoreFile, Long> {
1574       @Override
1575       public Long apply(StoreFile sf) {
1576         return sf.getMaxSequenceId();
1577       }
1578     }
1579 
1580     private static class GetFileSize implements Function<StoreFile, Long> {
1581       @Override
1582       public Long apply(StoreFile sf) {
1583         return sf.getReader().length();
1584       }
1585     }
1586 
1587     private static class GetBulkTime implements Function<StoreFile, Long> {
1588       @Override
1589       public Long apply(StoreFile sf) {
1590         if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
1591         return sf.getBulkLoadTimestamp();
1592       }
1593     }
1594 
1595     private static class GetPathName implements Function<StoreFile, String> {
1596       @Override
1597       public String apply(StoreFile sf) {
1598         return sf.getPath().getName();
1599       }
1600     }
1601   }
1602 }