View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.DataInput;
23  import java.io.FileNotFoundException;
24  import java.io.IOException;
25  import java.nio.ByteBuffer;
26  import java.util.Arrays;
27  import java.util.Collection;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.Map;
31  import java.util.SortedSet;
32  import java.util.UUID;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  import java.util.regex.Matcher;
35  import java.util.regex.Pattern;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.fs.FileStatus;
42  import org.apache.hadoop.fs.FileSystem;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.hbase.HBaseFileSystem;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
47  import org.apache.hadoop.hbase.KeyValue;
48  import org.apache.hadoop.hbase.KeyValue.KVComparator;
49  import org.apache.hadoop.hbase.client.Scan;
50  import org.apache.hadoop.hbase.fs.HFileSystem;
51  import org.apache.hadoop.hbase.io.HFileLink;
52  import org.apache.hadoop.hbase.io.HalfStoreFileReader;
53  import org.apache.hadoop.hbase.io.Reference;
54  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
55  import org.apache.hadoop.hbase.io.hfile.BlockType;
56  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
57  import org.apache.hadoop.hbase.io.hfile.Compression;
58  import org.apache.hadoop.hbase.io.hfile.HFile;
59  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
60  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
61  import org.apache.hadoop.hbase.io.hfile.HFileWriterV1;
62  import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
63  import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
64  import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
65  import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
66  import org.apache.hadoop.hbase.util.BloomFilter;
67  import org.apache.hadoop.hbase.util.BloomFilterFactory;
68  import org.apache.hadoop.hbase.util.BloomFilterWriter;
69  import org.apache.hadoop.hbase.util.Bytes;
70  import org.apache.hadoop.hbase.util.ChecksumType;
71  import org.apache.hadoop.hbase.util.FSUtils;
72  import org.apache.hadoop.hbase.util.Writables;
73  import org.apache.hadoop.io.RawComparator;
74  import org.apache.hadoop.io.WritableUtils;
75  
76  import com.google.common.base.Function;
77  import com.google.common.base.Preconditions;
78  import com.google.common.collect.ImmutableList;
79  import com.google.common.collect.Ordering;
80  
81  /**
82   * A Store data file.  Stores usually have one or more of these files.  They
83   * are produced by flushing the memstore to disk.  To
84   * create, instantiate a writer using {@link StoreFile#WriterBuilder}
85   * and append data. Be sure to add any metadata before calling close on the
86   * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
87   * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
88   * passing filesystem and path.  To read, call {@link #createReader()}.
89   * <p>StoreFiles may also reference store files in another Store.
90   *
91   * The reason for this weird pattern where you use a different instance for the
92   * writer and a reader is that we write once but read a lot more.
93   */
94  public class StoreFile extends SchemaConfigured {
95    static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
96  
97    public static enum BloomType {
98      /**
99       * Bloomfilters disabled
100      */
101     NONE,
102     /**
103      * Bloom enabled with Table row as Key
104      */
105     ROW,
106     /**
107      * Bloom enabled with Table row & column (family+qualifier) as Key
108      */
109     ROWCOL
110   }
111 
112   // Keys for fileinfo values in HFile
113 
114   /** Max Sequence ID in FileInfo */
115   public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
116 
117   /** Major compaction flag in FileInfo */
118   public static final byte[] MAJOR_COMPACTION_KEY =
119       Bytes.toBytes("MAJOR_COMPACTION_KEY");
120 
121   /** Major compaction flag in FileInfo */
122   public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
123       Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
124 
125   /** Bloom filter Type in FileInfo */
126   public static final byte[] BLOOM_FILTER_TYPE_KEY =
127       Bytes.toBytes("BLOOM_FILTER_TYPE");
128 
129   /** Delete Family Count in FileInfo */
130   public static final byte[] DELETE_FAMILY_COUNT =
131       Bytes.toBytes("DELETE_FAMILY_COUNT");
132 
133   /** Last Bloom filter key in FileInfo */
134   private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
135 
136   /** Key for Timerange information in metadata*/
137   public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
138 
139   /** Key for timestamp of earliest-put in metadata*/
140   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
141 
142   // Make default block size for StoreFiles 8k while testing.  TODO: FIX!
143   // Need to make it 8k for testing.
144   public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
145 
146   private final FileSystem fs;
147 
148   // This file's path.
149   private final Path path;
150 
151   // If this storefile references another, this is the reference instance.
152   private Reference reference;
153 
154   // If this StoreFile references another, this is the other files path.
155   private Path referencePath;
156 
157   // If this storefile is a link to another, this is the link instance.
158   private HFileLink link;
159 
160   // Block cache configuration and reference.
161   private final CacheConfig cacheConf;
162 
163   // What kind of data block encoding will be used
164   private final HFileDataBlockEncoder dataBlockEncoder;
165 
166   // HDFS blocks distribution information
167   private HDFSBlocksDistribution hdfsBlocksDistribution;
168 
169   // Keys for metadata stored in backing HFile.
170   // Set when we obtain a Reader.
171   private long sequenceid = -1;
172 
173   // max of the MemstoreTS in the KV's in this store
174   // Set when we obtain a Reader.
175   private long maxMemstoreTS = -1;
176 
177   public long getMaxMemstoreTS() {
178     return maxMemstoreTS;
179   }
180 
181   public void setMaxMemstoreTS(long maxMemstoreTS) {
182     this.maxMemstoreTS = maxMemstoreTS;
183   }
184 
185   // If true, this file was product of a major compaction.  Its then set
186   // whenever you get a Reader.
187   private AtomicBoolean majorCompaction = null;
188 
189   // If true, this file should not be included in minor compactions.
190   // It's set whenever you get a Reader.
191   private boolean excludeFromMinorCompaction = false;
192 
193   /** Meta key set when store file is a result of a bulk load */
194   public static final byte[] BULKLOAD_TASK_KEY =
195     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
196   public static final byte[] BULKLOAD_TIME_KEY =
197     Bytes.toBytes("BULKLOAD_TIMESTAMP");
198 
199   /**
200    * Map of the metadata entries in the corresponding HFile
201    */
202   private Map<byte[], byte[]> metadataMap;
203 
204   /**
205    * A non-capture group, for hfiles, so that this can be embedded.
206    * HFiles are uuid ([0-9a-z]+). Bulk loaded hfiles has (_SeqId_[0-9]+_) has suffix.
207    */
208   public static final String HFILE_NAME_REGEX = "[0-9a-f]+(?:_SeqId_[0-9]+_)?";
209 
210   /** Regex that will work for hfiles */
211   private static final Pattern HFILE_NAME_PATTERN =
212     Pattern.compile("^(" + HFILE_NAME_REGEX + ")");
213 
214   /**
215    * Regex that will work for straight reference names (<hfile>.<parentEncRegion>)
216    * and hfilelink reference names (<table>=<region>-<hfile>.<parentEncRegion>)
217    * If reference, then the regex has more than just one group.
218    * Group 1, hfile/hfilelink pattern, is this file's id.
219    * Group 2 '(.+)' is the reference's parent region name.
220    */
221   private static final Pattern REF_NAME_PATTERN =
222     Pattern.compile(String.format("^(%s|%s)\\.(.+)$",
223       HFILE_NAME_REGEX, HFileLink.LINK_NAME_REGEX));
224 
225   // StoreFile.Reader
226   private volatile Reader reader;
227 
228   /**
229    * Bloom filter type specified in column family configuration. Does not
230    * necessarily correspond to the Bloom filter type present in the HFile.
231    */
232   private final BloomType cfBloomType;
233 
234   // the last modification time stamp
235   private long modificationTimeStamp = 0L;
236 
237   /**
238    * Constructor, loads a reader and it's indices, etc. May allocate a
239    * substantial amount of ram depending on the underlying files (10-20MB?).
240    *
241    * @param fs  The current file system to use.
242    * @param p  The path of the file.
243    * @param blockcache  <code>true</code> if the block cache is enabled.
244    * @param conf  The current configuration.
245    * @param cacheConf  The cache configuration and block cache reference.
246    * @param cfBloomType The bloom type to use for this store file as specified
247    *          by column family configuration. This may or may not be the same
248    *          as the Bloom filter type actually present in the HFile, because
249    *          column family configuration might change. If this is
250    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
251    * @param dataBlockEncoder data block encoding algorithm.
252    * @throws IOException When opening the reader fails.
253    */
254   public StoreFile(final FileSystem fs,
255             final Path p,
256             final Configuration conf,
257             final CacheConfig cacheConf,
258             final BloomType cfBloomType,
259             final HFileDataBlockEncoder dataBlockEncoder)
260       throws IOException {
261     this.fs = fs;
262     this.path = p;
263     this.cacheConf = cacheConf;
264     this.dataBlockEncoder =
265         dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE
266             : dataBlockEncoder;
267 
268     if (HFileLink.isHFileLink(p)) {
269       this.link = new HFileLink(conf, p);
270       LOG.debug("Store file " + p + " is a link");
271     } else if (isReference(p)) {
272       this.reference = Reference.read(fs, p);
273       this.referencePath = getReferredToFile(this.path);
274       if (HFileLink.isHFileLink(this.referencePath)) {
275         this.link = new HFileLink(conf, this.referencePath);
276       }
277       LOG.debug("Store file " + p + " is a " + reference.getFileRegion() +
278         " reference to " + this.referencePath);
279     } else if (!isHFile(p)) {
280       throw new IOException("path=" + path + " doesn't look like a valid StoreFile");
281     }
282 
283     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
284       this.cfBloomType = cfBloomType;
285     } else {
286       LOG.info("Ignoring bloom filter check for file " + path + ": " +
287           "cfBloomType=" + cfBloomType + " (disabled in config)");
288       this.cfBloomType = BloomType.NONE;
289     }
290 
291     // cache the modification time stamp of this store file
292     FileStatus[] stats = FSUtils.listStatus(fs, p, null);
293     if (stats != null && stats.length == 1) {
294       this.modificationTimeStamp = stats[0].getModificationTime();
295     } else {
296       this.modificationTimeStamp = 0;
297     }
298 
299     SchemaMetrics.configureGlobally(conf);
300   }
301 
302   /**
303    * @return Path or null if this StoreFile was made with a Stream.
304    */
305   public Path getPath() {
306     return this.path;
307   }
308 
309   /**
310    * @return The Store/ColumnFamily this file belongs to.
311    */
312   byte [] getFamily() {
313     return Bytes.toBytes(this.path.getParent().getName());
314   }
315 
316   /**
317    * @return True if this is a StoreFile Reference; call after {@link #open()}
318    * else may get wrong answer.
319    */
320   boolean isReference() {
321     return this.reference != null;
322   }
323 
324   /**
325    * @return the Reference object associated to this StoreFile.
326    *         null if the StoreFile is not a reference.
327    */
328   Reference getReference() {
329     return this.reference;
330   }
331 
332   /**
333    * @return <tt>true</tt> if this StoreFile is an HFileLink
334    */
335   boolean isLink() {
336     return this.link != null && this.reference == null;
337   }
338 
339   private static boolean isHFile(final Path path) {
340     Matcher m = HFILE_NAME_PATTERN.matcher(path.getName());
341     return m.matches() && m.groupCount() > 0;
342   }
343 
344   /**
345    * @param p Path to check.
346    * @return True if the path has format of a HStoreFile reference.
347    */
348   public static boolean isReference(final Path p) {
349     return isReference(p.getName());
350   }
351 
352   /**
353    * @param name file name to check.
354    * @return True if the path has format of a HStoreFile reference.
355    */
356   public static boolean isReference(final String name) {
357     Matcher m = REF_NAME_PATTERN.matcher(name);
358     return m.matches() && m.groupCount() > 1;
359   }
360 
361   /*
362    * Return path to the file referred to by a Reference.  Presumes a directory
363    * hierarchy of <code>${hbase.rootdir}/tablename/regionname/familyname</code>.
364    * @param p Path to a Reference file.
365    * @return Calculated path to parent region file.
366    * @throws IllegalArgumentException when path regex fails to match.
367    */
368   public static Path getReferredToFile(final Path p) {
369     Matcher m = REF_NAME_PATTERN.matcher(p.getName());
370     if (m == null || !m.matches()) {
371       LOG.warn("Failed match of store file name " + p.toString());
372       throw new IllegalArgumentException("Failed match of store file name " +
373           p.toString());
374     }
375     // Other region name is suffix on the passed Reference file name
376     String otherRegion = m.group(2);
377     // Tabledir is up two directories from where Reference was written.
378     Path tableDir = p.getParent().getParent().getParent();
379     String nameStrippedOfSuffix = m.group(1);
380     LOG.debug("reference '" + p + "' to region=" + otherRegion + " hfile=" + nameStrippedOfSuffix);
381 
382     // Build up new path with the referenced region in place of our current
383     // region in the reference path.  Also strip regionname suffix from name.
384     return new Path(new Path(new Path(tableDir, otherRegion),
385       p.getParent().getName()), nameStrippedOfSuffix);
386   }
387 
388   /**
389    * @return True if this file was made by a major compaction.
390    */
391   boolean isMajorCompaction() {
392     if (this.majorCompaction == null) {
393       throw new NullPointerException("This has not been set yet");
394     }
395     return this.majorCompaction.get();
396   }
397 
398   /**
399    * @return True if this file should not be part of a minor compaction.
400    */
401   boolean excludeFromMinorCompaction() {
402     return this.excludeFromMinorCompaction;
403   }
404 
405   /**
406    * @return This files maximum edit sequence id.
407    */
408   public long getMaxSequenceId() {
409     return this.sequenceid;
410   }
411 
412   public long getModificationTimeStamp() {
413     return modificationTimeStamp;
414   }
415 
416   /**
417    * Return the largest memstoreTS found across all storefiles in
418    * the given list. Store files that were created by a mapreduce
419    * bulk load are ignored, as they do not correspond to any specific
420    * put operation, and thus do not have a memstoreTS associated with them.
421    * @return 0 if no non-bulk-load files are provided or, this is Store that
422    * does not yet have any store files.
423    */
424   public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) {
425     long max = 0;
426     for (StoreFile sf : sfs) {
427       if (!sf.isBulkLoadResult()) {
428         max = Math.max(max, sf.getMaxMemstoreTS());
429       }
430     }
431     return max;
432   }
433 
434   /**
435    * Return the highest sequence ID found across all storefiles in
436    * the given list. Store files that were created by a mapreduce
437    * bulk load are ignored, as they do not correspond to any edit
438    * log items.
439    * @return 0 if no non-bulk-load files are provided or, this is Store that
440    * does not yet have any store files.
441    */
442   public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
443     long max = 0;
444     for (StoreFile sf : sfs) {
445       max = Math.max(max, sf.getMaxSequenceId());
446     }
447     return max;
448   }
449 
450   /**
451    * @return true if this storefile was created by HFileOutputFormat
452    * for a bulk load.
453    */
454   boolean isBulkLoadResult() {
455     return metadataMap.containsKey(BULKLOAD_TIME_KEY);
456   }
457 
458   /**
459    * Return the timestamp at which this bulk load file was generated.
460    */
461   public long getBulkLoadTimestamp() {
462     return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY));
463   }
464 
465   /**
466    * @return the cached value of HDFS blocks distribution. The cached value is
467    * calculated when store file is opened.
468    */
469   public HDFSBlocksDistribution getHDFSBlockDistribution() {
470     return this.hdfsBlocksDistribution;
471   }
472 
473   /**
474    * helper function to compute HDFS blocks distribution of a given reference
475    * file.For reference file, we don't compute the exact value. We use some
476    * estimate instead given it might be good enough. we assume bottom part
477    * takes the first half of reference file, top part takes the second half
478    * of the reference file. This is just estimate, given
479    * midkey ofregion != midkey of HFile, also the number and size of keys vary.
480    * If this estimate isn't good enough, we can improve it later.
481    * @param fs  The FileSystem
482    * @param reference  The reference
483    * @param status  The reference FileStatus
484    * @return HDFS blocks distribution
485    */
486   static private HDFSBlocksDistribution computeRefFileHDFSBlockDistribution(
487     FileSystem fs, Reference reference, FileStatus status) throws IOException {
488     if (status == null) {
489       return null;
490     }
491 
492     long start = 0;
493     long length = 0;
494 
495     if (Reference.isTopFileRegion(reference.getFileRegion())) {
496       start = status.getLen()/2;
497       length = status.getLen() - status.getLen()/2;
498     } else {
499       start = 0;
500       length = status.getLen()/2;
501     }
502     return FSUtils.computeHDFSBlocksDistribution(fs, status, start, length);
503   }
504 
505   /**
506    * compute HDFS block distribution, for reference file, it is an estimate
507    */
508   private void computeHDFSBlockDistribution() throws IOException {
509     if (isReference()) {
510       FileStatus status;
511       if (this.link != null) {
512         status = this.link.getFileStatus(fs);
513       } else {
514         status = fs.getFileStatus(this.referencePath);
515       }
516       this.hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(
517         this.fs, this.reference, status);
518     } else {
519       FileStatus status;
520       if (isLink()) {
521         status = link.getFileStatus(fs);
522       } else {
523         status = this.fs.getFileStatus(path);
524       }
525       long length = status.getLen();
526       this.hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(
527         this.fs, status, 0, length);
528     }
529   }
530 
531   /**
532    * Opens reader on this store file.  Called by Constructor.
533    * @return Reader for the store file.
534    * @throws IOException
535    * @see #closeReader()
536    */
537   private Reader open() throws IOException {
538     if (this.reader != null) {
539       throw new IllegalAccessError("Already open");
540     }
541     if (isReference()) {
542       if (this.link != null) {
543         this.reader = new HalfStoreFileReader(this.fs, this.referencePath, this.link,
544           this.cacheConf, this.reference, dataBlockEncoder.getEncodingInCache());
545       } else {
546         this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
547           this.cacheConf, this.reference, dataBlockEncoder.getEncodingInCache());
548       }
549     } else if (isLink()) {
550       long size = link.getFileStatus(fs).getLen();
551       this.reader = new Reader(this.fs, this.path, link, size, this.cacheConf,
552           dataBlockEncoder.getEncodingInCache(), true);
553     } else {
554       this.reader = new Reader(this.fs, this.path, this.cacheConf,
555           dataBlockEncoder.getEncodingInCache());
556     }
557 
558     if (isSchemaConfigured()) {
559       SchemaConfigured.resetSchemaMetricsConf(reader);
560       passSchemaMetricsTo(reader);
561     }
562 
563     computeHDFSBlockDistribution();
564 
565     // Load up indices and fileinfo. This also loads Bloom filter type.
566     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
567 
568     // Read in our metadata.
569     byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
570     if (b != null) {
571       // By convention, if halfhfile, top half has a sequence number > bottom
572       // half. Thats why we add one in below. Its done for case the two halves
573       // are ever merged back together --rare.  Without it, on open of store,
574       // since store files are distinguished by sequence id, the one half would
575       // subsume the other.
576       this.sequenceid = Bytes.toLong(b);
577       if (isReference()) {
578         if (Reference.isTopFileRegion(this.reference.getFileRegion())) {
579           this.sequenceid += 1;
580         }
581       }
582     }
583 
584     if (isBulkLoadResult()) {
585       // generate the sequenceId from the fileName
586       // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
587       String fileName = this.path.getName();
588       int startPos = fileName.indexOf("SeqId_");
589       if (startPos != -1) {
590         this.sequenceid =
591             Long.parseLong(fileName.substring(startPos + 6, fileName.indexOf('_', startPos + 6)));
592         // Handle reference files as done above.
593         if (isReference()) {
594           if (Reference.isTopFileRegion(this.reference.getFileRegion())) {
595             this.sequenceid += 1;
596           }
597         }
598       }
599     }
600 
601     this.reader.setSequenceID(this.sequenceid);
602 
603     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
604     if (b != null) {
605       this.maxMemstoreTS = Bytes.toLong(b);
606     }
607 
608     b = metadataMap.get(MAJOR_COMPACTION_KEY);
609     if (b != null) {
610       boolean mc = Bytes.toBoolean(b);
611       if (this.majorCompaction == null) {
612         this.majorCompaction = new AtomicBoolean(mc);
613       } else {
614         this.majorCompaction.set(mc);
615       }
616     } else {
617       // Presume it is not major compacted if it doesn't explicity say so
618       // HFileOutputFormat explicitly sets the major compacted key.
619       this.majorCompaction = new AtomicBoolean(false);
620     }
621 
622     b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
623     this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
624 
625     BloomType hfileBloomType = reader.getBloomFilterType();
626     if (cfBloomType != BloomType.NONE) {
627       reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
628       if (hfileBloomType != cfBloomType) {
629         LOG.info("HFile Bloom filter type for "
630             + reader.getHFileReader().getName() + ": " + hfileBloomType
631             + ", but " + cfBloomType + " specified in column family "
632             + "configuration");
633       }
634     } else if (hfileBloomType != BloomType.NONE) {
635       LOG.info("Bloom filter turned off by CF config for "
636           + reader.getHFileReader().getName());
637     }
638 
639     // load delete family bloom filter
640     reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
641 
642     try {
643       byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
644       if (timerangeBytes != null) {
645         this.reader.timeRangeTracker = new TimeRangeTracker();
646         Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
647       }
648     } catch (IllegalArgumentException e) {
649       LOG.error("Error reading timestamp range data from meta -- " +
650           "proceeding without", e);
651       this.reader.timeRangeTracker = null;
652     }
653     return this.reader;
654   }
655 
656   /**
657    * @return Reader for StoreFile. creates if necessary
658    * @throws IOException
659    */
660   public Reader createReader() throws IOException {
661     if (this.reader == null) {
662       try {
663         this.reader = open();
664       } catch (IOException e) {
665         try {
666           this.closeReader(true);
667         } catch (IOException ee) {              
668         }
669         throw e;
670       }
671 
672     }
673     return this.reader;
674   }
675 
676   /**
677    * @return Current reader.  Must call createReader first else returns null.
678    * @see #createReader()
679    */
680   public Reader getReader() {
681     return this.reader;
682   }
683 
684   /**
685    * @param evictOnClose whether to evict blocks belonging to this file
686    * @throws IOException
687    */
688   public synchronized void closeReader(boolean evictOnClose)
689       throws IOException {
690     if (this.reader != null) {
691       this.reader.close(evictOnClose);
692       this.reader = null;
693     }
694   }
695 
696   /**
697    * Delete this file
698    * @throws IOException
699    */
700   public void deleteReader() throws IOException {
701     closeReader(true);
702     HBaseFileSystem.deleteDirFromFileSystem(fs, getPath());
703   }
704 
705   @Override
706   public String toString() {
707     return this.path.toString() +
708       (isReference()? "-" + this.referencePath + "-" + reference.toString(): "");
709   }
710 
711   /**
712    * @return a length description of this StoreFile, suitable for debug output
713    */
714   public String toStringDetailed() {
715     StringBuilder sb = new StringBuilder();
716     sb.append(this.path.toString());
717     sb.append(", isReference=").append(isReference());
718     sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
719     if (isBulkLoadResult()) {
720       sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
721     } else {
722       sb.append(", seqid=").append(getMaxSequenceId());
723     }
724     sb.append(", majorCompaction=").append(isMajorCompaction());
725 
726     return sb.toString();
727   }
728 
729   /**
730    * Utility to help with rename.
731    * @param fs
732    * @param src
733    * @param tgt
734    * @return True if succeeded.
735    * @throws IOException
736    */
737   public static Path rename(final FileSystem fs,
738                             final Path src,
739                             final Path tgt)
740       throws IOException {
741 
742     if (!fs.exists(src)) {
743       throw new FileNotFoundException(src.toString());
744     }
745     if (!HBaseFileSystem.renameDirForFileSystem(fs, src, tgt)) {
746       throw new IOException("Failed rename of " + src + " to " + tgt);
747     }
748     return tgt;
749   }
750 
751   public static class WriterBuilder {
752     private final Configuration conf;
753     private final CacheConfig cacheConf;
754     private final FileSystem fs;
755     private final int blockSize;
756 
757     private Compression.Algorithm compressAlgo =
758         HFile.DEFAULT_COMPRESSION_ALGORITHM;
759     private HFileDataBlockEncoder dataBlockEncoder =
760         NoOpDataBlockEncoder.INSTANCE;
761     private KeyValue.KVComparator comparator = KeyValue.COMPARATOR;
762     private BloomType bloomType = BloomType.NONE;
763     private long maxKeyCount = 0;
764     private Path dir;
765     private Path filePath;
766     private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
767     private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM;
768     private boolean includeMVCCReadpoint = true;
769 
770     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
771         FileSystem fs, int blockSize) {
772       this.conf = conf;
773       this.cacheConf = cacheConf;
774       this.fs = fs;
775       this.blockSize = blockSize;
776     }
777 
778     /**
779      * Use either this method or {@link #withFilePath}, but not both.
780      * @param dir Path to column family directory. The directory is created if
781      *          does not exist. The file is given a unique name within this
782      *          directory.
783      * @return this (for chained invocation)
784      */
785     public WriterBuilder withOutputDir(Path dir) {
786       Preconditions.checkNotNull(dir);
787       this.dir = dir;
788       return this;
789     }
790 
791     /**
792      * Use either this method or {@link #withOutputDir}, but not both.
793      * @param filePath the StoreFile path to write
794      * @return this (for chained invocation)
795      */
796     public WriterBuilder withFilePath(Path filePath) {
797       Preconditions.checkNotNull(filePath);
798       this.filePath = filePath;
799       return this;
800     }
801 
802     public WriterBuilder withCompression(Compression.Algorithm compressAlgo) {
803       Preconditions.checkNotNull(compressAlgo);
804       this.compressAlgo = compressAlgo;
805       return this;
806     }
807 
808     public WriterBuilder withDataBlockEncoder(HFileDataBlockEncoder encoder) {
809       Preconditions.checkNotNull(encoder);
810       this.dataBlockEncoder = encoder;
811       return this;
812     }
813 
814     public WriterBuilder withComparator(KeyValue.KVComparator comparator) {
815       Preconditions.checkNotNull(comparator);
816       this.comparator = comparator;
817       return this;
818     }
819 
820     public WriterBuilder withBloomType(BloomType bloomType) {
821       Preconditions.checkNotNull(bloomType);
822       this.bloomType = bloomType;
823       return this;
824     }
825 
826     /**
827      * @param maxKeyCount estimated maximum number of keys we expect to add
828      * @return this (for chained invocation)
829      */
830     public WriterBuilder withMaxKeyCount(long maxKeyCount) {
831       this.maxKeyCount = maxKeyCount;
832       return this;
833     }
834 
835     /**
836      * @param checksumType the type of checksum
837      * @return this (for chained invocation)
838      */
839     public WriterBuilder withChecksumType(ChecksumType checksumType) {
840       this.checksumType = checksumType;
841       return this;
842     }
843 
844     /**
845      * @param bytesPerChecksum the number of bytes per checksum chunk
846      * @return this (for chained invocation)
847      */
848     public WriterBuilder withBytesPerChecksum(int bytesPerChecksum) {
849       this.bytesPerChecksum = bytesPerChecksum;
850       return this;
851     }
852 
853     /**
854      * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV
855      * @return this (for chained invocation)
856      */
857     public WriterBuilder includeMVCCReadpoint(boolean includeMVCCReadpoint) {
858       this.includeMVCCReadpoint = includeMVCCReadpoint;
859       return this;
860     }
861 
862     /**
863      * Create a store file writer. Client is responsible for closing file when
864      * done. If metadata, add BEFORE closing using
865      * {@link Writer#appendMetadata}.
866      */
867     public Writer build() throws IOException {
868       if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
869         throw new IllegalArgumentException("Either specify parent directory " +
870             "or file path");
871       }
872 
873       if (dir == null) {
874         dir = filePath.getParent();
875       }
876 
877       if (!fs.exists(dir)) {
878         HBaseFileSystem.makeDirOnFileSystem(fs, dir);
879       }
880 
881       if (filePath == null) {
882         filePath = getUniqueFile(fs, dir);
883         if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
884           bloomType = BloomType.NONE;
885         }
886       }
887 
888       if (compressAlgo == null) {
889         compressAlgo = HFile.DEFAULT_COMPRESSION_ALGORITHM;
890       }
891       if (comparator == null) {
892         comparator = KeyValue.COMPARATOR;
893       }
894       return new Writer(fs, filePath, blockSize, compressAlgo, dataBlockEncoder,
895           conf, cacheConf, comparator, bloomType, maxKeyCount, checksumType,
896           bytesPerChecksum, includeMVCCReadpoint);
897     }
898   }
899 
900   /**
901    * @param fs
902    * @param dir Directory to create file in.
903    * @return random filename inside passed <code>dir</code>
904    */
905   public static Path getUniqueFile(final FileSystem fs, final Path dir)
906       throws IOException {
907     if (!fs.getFileStatus(dir).isDir()) {
908       throw new IOException("Expecting " + dir.toString() +
909         " to be a directory");
910     }
911     return getRandomFilename(fs, dir);
912   }
913 
914   /**
915    *
916    * @param fs
917    * @param dir
918    * @return Path to a file that doesn't exist at time of this invocation.
919    * @throws IOException
920    */
921   static Path getRandomFilename(final FileSystem fs, final Path dir)
922       throws IOException {
923     return getRandomFilename(fs, dir, null);
924   }
925 
926   /**
927    *
928    * @param fs
929    * @param dir
930    * @param suffix
931    * @return Path to a file that doesn't exist at time of this invocation.
932    * @throws IOException
933    */
934   static Path getRandomFilename(final FileSystem fs,
935                                 final Path dir,
936                                 final String suffix)
937       throws IOException {
938     return new Path(dir, UUID.randomUUID().toString().replaceAll("-", "")
939         + (suffix == null ? "" : suffix));
940   }
941 
942   /**
943    * Validate the store file name.
944    * @param fileName name of the file to validate
945    * @return <tt>true</tt> if the file could be a valid store file, <tt>false</tt> otherwise
946    */
947   public static boolean validateStoreFileName(String fileName) {
948     if (HFileLink.isHFileLink(fileName))
949       return true;
950     if (isReference(fileName))
951       return true;
952     return !fileName.contains("-");
953   }
954 
955   /**
956    * Write out a split reference. Package local so it doesnt leak out of
957    * regionserver.
958    * @param fs
959    * @param splitDir Presumes path format is actually
960    *          <code>SOME_DIRECTORY/REGIONNAME/FAMILY</code>.
961    * @param f File to split.
962    * @param splitRow
963    * @param range
964    * @return Path to created reference.
965    * @throws IOException
966    */
967   static Path split(final FileSystem fs,
968                     final Path splitDir,
969                     final StoreFile f,
970                     final byte [] splitRow,
971                     final Reference.Range range)
972       throws IOException {
973 	    
974     // Check whether the split row lies in the range of the store file
975     // If it is outside the range, return directly.
976     if (range == Reference.Range.bottom) {
977       //check if smaller than first key
978       KeyValue splitKey = KeyValue.createLastOnRow(splitRow);
979       byte[] firstKey = f.createReader().getFirstKey();
980       // If firstKey is null means storefile is empty.
981       if (firstKey == null) return null;
982       if (f.getReader().getComparator().compare(splitKey.getBuffer(), 
983           splitKey.getKeyOffset(), splitKey.getKeyLength(), 
984           firstKey, 0, firstKey.length) < 0) {
985         return null;
986       }      
987     }
988     else {
989       //check if larger than last key.
990       KeyValue splitKey = KeyValue.createFirstOnRow(splitRow);
991       byte[] lastKey = f.createReader().getLastKey();      
992       // If lastKey is null means storefile is empty.
993       if (lastKey == null) return null;
994       if (f.getReader().getComparator().compare(splitKey.getBuffer(), 
995           splitKey.getKeyOffset(), splitKey.getKeyLength(), 
996           lastKey, 0, lastKey.length) > 0) {
997         return null;
998       }
999     }
1000     
1001     // A reference to the bottom half of the hsf store file.
1002     Reference r = new Reference(splitRow, range);
1003     // Add the referred-to regions name as a dot separated suffix.
1004     // See REF_NAME_REGEX regex above.  The referred-to regions name is
1005     // up in the path of the passed in <code>f</code> -- parentdir is family,
1006     // then the directory above is the region name.
1007     String parentRegionName = f.getPath().getParent().getParent().getName();
1008     // Write reference with same file id only with the other region name as
1009     // suffix and into the new region location (under same family).
1010     Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
1011     return r.write(fs, p);
1012   }
1013 
1014 
1015   /**
1016    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
1017    * local because it is an implementation detail of the HBase regionserver.
1018    */
1019   public static class Writer {
1020     private final BloomFilterWriter generalBloomFilterWriter;
1021     private final BloomFilterWriter deleteFamilyBloomFilterWriter;
1022     private final BloomType bloomType;
1023     private byte[] lastBloomKey;
1024     private int lastBloomKeyOffset, lastBloomKeyLen;
1025     private KVComparator kvComparator;
1026     private KeyValue lastKv = null;
1027     private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
1028     private KeyValue lastDeleteFamilyKV = null;
1029     private long deleteFamilyCnt = 0;
1030 
1031     protected HFileDataBlockEncoder dataBlockEncoder;
1032 
1033     /** Checksum type */
1034     protected ChecksumType checksumType;
1035 
1036     /** Bytes per Checksum */
1037     protected int bytesPerChecksum;
1038     
1039     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
1040     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
1041      * When flushing a memstore, we set TimeRange and use this variable to
1042      * indicate that it doesn't need to be calculated again while
1043      * appending KeyValues.
1044      * It is not set in cases of compactions when it is recalculated using only
1045      * the appended KeyValues*/
1046     boolean isTimeRangeTrackerSet = false;
1047 
1048     protected HFile.Writer writer;
1049 
1050     /**
1051      * Creates an HFile.Writer that also write helpful meta data.
1052      * @param fs file system to write to
1053      * @param path file name to create
1054      * @param blocksize HDFS block size
1055      * @param compress HDFS block compression
1056      * @param conf user configuration
1057      * @param comparator key comparator
1058      * @param bloomType bloom filter setting
1059      * @param maxKeys the expected maximum number of keys to be added. Was used
1060      *        for Bloom filter size in {@link HFile} format version 1.
1061      * @param checksumType the checksum type
1062      * @param bytesPerChecksum the number of bytes per checksum value
1063      * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV
1064      * @throws IOException problem writing to FS
1065      */
1066     private Writer(FileSystem fs, Path path, int blocksize,
1067         Compression.Algorithm compress,
1068         HFileDataBlockEncoder dataBlockEncoder, final Configuration conf,
1069         CacheConfig cacheConf,
1070         final KVComparator comparator, BloomType bloomType, long maxKeys,
1071         final ChecksumType checksumType, final int bytesPerChecksum, boolean includeMVCCReadpoint)
1072         throws IOException {
1073       this.dataBlockEncoder = dataBlockEncoder != null ?
1074           dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
1075       writer = HFile.getWriterFactory(conf, cacheConf)
1076           .withPath(fs, path)
1077           .withBlockSize(blocksize)
1078           .withCompression(compress)
1079           .withDataBlockEncoder(dataBlockEncoder)
1080           .withComparator(comparator.getRawComparator())
1081           .withChecksumType(checksumType)
1082           .withBytesPerChecksum(bytesPerChecksum)
1083           .includeMVCCReadpoint(includeMVCCReadpoint)
1084           .create();
1085 
1086       this.kvComparator = comparator;
1087 
1088       generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
1089           conf, cacheConf, bloomType,
1090           (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
1091 
1092       if (generalBloomFilterWriter != null) {
1093         this.bloomType = bloomType;
1094         LOG.info("Bloom filter type for " + path + ": " + this.bloomType + ", "
1095             + generalBloomFilterWriter.getClass().getSimpleName());
1096       } else {
1097         // Not using Bloom filters.
1098         this.bloomType = BloomType.NONE;
1099       }
1100 
1101       // initialize delete family Bloom filter when there is NO RowCol Bloom
1102       // filter
1103       if (this.bloomType != BloomType.ROWCOL) {
1104         this.deleteFamilyBloomFilterWriter = BloomFilterFactory
1105             .createDeleteBloomAtWrite(conf, cacheConf,
1106                 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
1107       } else {
1108         deleteFamilyBloomFilterWriter = null;
1109       }
1110       if (deleteFamilyBloomFilterWriter != null) {
1111         LOG.info("Delete Family Bloom filter type for " + path + ": "
1112             + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
1113       }
1114       this.checksumType = checksumType;
1115       this.bytesPerChecksum = bytesPerChecksum;
1116     }
1117 
1118     /**
1119      * Writes meta data.
1120      * Call before {@link #close()} since its written as meta data to this file.
1121      * @param maxSequenceId Maximum sequence id.
1122      * @param majorCompaction True if this file is product of a major compaction
1123      * @throws IOException problem writing to FS
1124      */
1125     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
1126     throws IOException {
1127       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
1128       writer.appendFileInfo(MAJOR_COMPACTION_KEY,
1129           Bytes.toBytes(majorCompaction));
1130       appendTrackedTimestampsToMetadata();
1131     }
1132 
1133     /**
1134      * Add TimestampRange and earliest put timestamp to Metadata
1135      */
1136     public void appendTrackedTimestampsToMetadata() throws IOException {
1137       appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
1138       appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
1139     }
1140 
1141     /**
1142      * Set TimeRangeTracker
1143      * @param trt
1144      */
1145     public void setTimeRangeTracker(final TimeRangeTracker trt) {
1146       this.timeRangeTracker = trt;
1147       isTimeRangeTrackerSet = true;
1148     }
1149 
1150     /**
1151      * Record the earlest Put timestamp.
1152      *
1153      * If the timeRangeTracker is not set,
1154      * update TimeRangeTracker to include the timestamp of this key
1155      * @param kv
1156      */
1157     public void trackTimestamps(final KeyValue kv) {
1158       if (KeyValue.Type.Put.getCode() == kv.getType()) {
1159         earliestPutTs = Math.min(earliestPutTs, kv.getTimestamp());
1160       }
1161       if (!isTimeRangeTrackerSet) {
1162         timeRangeTracker.includeTimestamp(kv);
1163       }
1164     }
1165 
1166     private void appendGeneralBloomfilter(final KeyValue kv) throws IOException {
1167       if (this.generalBloomFilterWriter != null) {
1168         // only add to the bloom filter on a new, unique key
1169         boolean newKey = true;
1170         if (this.lastKv != null) {
1171           switch(bloomType) {
1172           case ROW:
1173             newKey = ! kvComparator.matchingRows(kv, lastKv);
1174             break;
1175           case ROWCOL:
1176             newKey = ! kvComparator.matchingRowColumn(kv, lastKv);
1177             break;
1178           case NONE:
1179             newKey = false;
1180             break;
1181           default:
1182             throw new IOException("Invalid Bloom filter type: " + bloomType +
1183                 " (ROW or ROWCOL expected)");
1184           }
1185         }
1186         if (newKey) {
1187           /*
1188            * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
1189            * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
1190            *
1191            * 2 Types of Filtering:
1192            *  1. Row = Row
1193            *  2. RowCol = Row + Qualifier
1194            */
1195           byte[] bloomKey;
1196           int bloomKeyOffset, bloomKeyLen;
1197 
1198           switch (bloomType) {
1199           case ROW:
1200             bloomKey = kv.getBuffer();
1201             bloomKeyOffset = kv.getRowOffset();
1202             bloomKeyLen = kv.getRowLength();
1203             break;
1204           case ROWCOL:
1205             // merge(row, qualifier)
1206             // TODO: could save one buffer copy in case of compound Bloom
1207             // filters when this involves creating a KeyValue
1208             bloomKey = generalBloomFilterWriter.createBloomKey(kv.getBuffer(),
1209                 kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
1210                 kv.getQualifierOffset(), kv.getQualifierLength());
1211             bloomKeyOffset = 0;
1212             bloomKeyLen = bloomKey.length;
1213             break;
1214           default:
1215             throw new IOException("Invalid Bloom filter type: " + bloomType +
1216                 " (ROW or ROWCOL expected)");
1217           }
1218           generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
1219           if (lastBloomKey != null
1220               && generalBloomFilterWriter.getComparator().compare(bloomKey,
1221                   bloomKeyOffset, bloomKeyLen, lastBloomKey,
1222                   lastBloomKeyOffset, lastBloomKeyLen) <= 0) {
1223             throw new IOException("Non-increasing Bloom keys: "
1224                 + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen)
1225                 + " after "
1226                 + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset,
1227                     lastBloomKeyLen));
1228           }
1229           lastBloomKey = bloomKey;
1230           lastBloomKeyOffset = bloomKeyOffset;
1231           lastBloomKeyLen = bloomKeyLen;
1232           this.lastKv = kv;
1233         }
1234       }
1235     }
1236 
1237     private void appendDeleteFamilyBloomFilter(final KeyValue kv)
1238         throws IOException {
1239       if (!kv.isDeleteFamily()) {
1240         return;
1241       }
1242 
1243       // increase the number of delete family in the store file
1244       deleteFamilyCnt++;
1245       if (null != this.deleteFamilyBloomFilterWriter) {
1246         boolean newKey = true;
1247         if (lastDeleteFamilyKV != null) {
1248           newKey = !kvComparator.matchingRows(kv, lastDeleteFamilyKV);
1249         }
1250         if (newKey) {
1251           this.deleteFamilyBloomFilterWriter.add(kv.getBuffer(),
1252               kv.getRowOffset(), kv.getRowLength());
1253           this.lastDeleteFamilyKV = kv;
1254         }
1255       }
1256     }
1257 
1258     public void append(final KeyValue kv) throws IOException {
1259       appendGeneralBloomfilter(kv);
1260       appendDeleteFamilyBloomFilter(kv);
1261       writer.append(kv);
1262       trackTimestamps(kv);
1263     }
1264 
1265     public Path getPath() {
1266       return this.writer.getPath();
1267     }
1268 
1269     boolean hasGeneralBloom() {
1270       return this.generalBloomFilterWriter != null;
1271     }
1272 
1273     /**
1274      * For unit testing only.
1275      *
1276      * @return the Bloom filter used by this writer.
1277      */
1278     BloomFilterWriter getGeneralBloomWriter() {
1279       return generalBloomFilterWriter;
1280     }
1281 
1282     private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
1283       boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
1284       if (haveBloom) {
1285         bfw.compactBloom();
1286       }
1287       return haveBloom;
1288     }
1289 
1290     private boolean closeGeneralBloomFilter() throws IOException {
1291       boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
1292 
1293       // add the general Bloom filter writer and append file info
1294       if (hasGeneralBloom) {
1295         writer.addGeneralBloomFilter(generalBloomFilterWriter);
1296         writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY,
1297             Bytes.toBytes(bloomType.toString()));
1298         if (lastBloomKey != null) {
1299           writer.appendFileInfo(LAST_BLOOM_KEY, Arrays.copyOfRange(
1300               lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
1301                   + lastBloomKeyLen));
1302         }
1303       }
1304       return hasGeneralBloom;
1305     }
1306 
1307     private boolean closeDeleteFamilyBloomFilter() throws IOException {
1308       boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
1309 
1310       // add the delete family Bloom filter writer
1311       if (hasDeleteFamilyBloom) {
1312         writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
1313       }
1314 
1315       // append file info about the number of delete family kvs
1316       // even if there is no delete family Bloom.
1317       writer.appendFileInfo(DELETE_FAMILY_COUNT,
1318           Bytes.toBytes(this.deleteFamilyCnt));
1319 
1320       return hasDeleteFamilyBloom;
1321     }
1322 
1323     public void close() throws IOException {
1324       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
1325       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
1326 
1327       writer.close();
1328 
1329       // Log final Bloom filter statistics. This needs to be done after close()
1330       // because compound Bloom filters might be finalized as part of closing.
1331       StoreFile.LOG.info((hasGeneralBloom ? "" : "NO ") + "General Bloom and "
1332           + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily"
1333           + " was added to HFile (" + getPath() + ") ");
1334 
1335     }
1336 
1337     public void appendFileInfo(byte[] key, byte[] value) throws IOException {
1338       writer.appendFileInfo(key, value);
1339     }
1340 
1341     /** For use in testing, e.g. {@link CreateRandomStoreFile} */
1342     HFile.Writer getHFileWriter() {
1343       return writer;
1344     }
1345   }
1346 
1347   /**
1348    * Reader for a StoreFile.
1349    */
1350   public static class Reader extends SchemaConfigured {
1351     static final Log LOG = LogFactory.getLog(Reader.class.getName());
1352 
1353     protected BloomFilter generalBloomFilter = null;
1354     protected BloomFilter deleteFamilyBloomFilter = null;
1355     protected BloomType bloomFilterType;
1356     private final HFile.Reader reader;
1357     protected TimeRangeTracker timeRangeTracker = null;
1358     protected long sequenceID = -1;
1359     private byte[] lastBloomKey;
1360     private long deleteFamilyCnt = -1;
1361 
1362     public Reader(FileSystem fs, Path path, CacheConfig cacheConf,
1363         DataBlockEncoding preferredEncodingInCache) throws IOException {
1364       super(path);
1365       reader = HFile.createReaderWithEncoding(fs, path, cacheConf,
1366           preferredEncodingInCache);
1367       bloomFilterType = BloomType.NONE;
1368     }
1369 
1370     public Reader(FileSystem fs, Path path, HFileLink hfileLink, long size,
1371         CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache,
1372         boolean closeIStream) throws IOException {
1373       super(path);
1374 
1375       FSDataInputStream in = hfileLink.open(fs);
1376       FSDataInputStream inNoChecksum = in;
1377       if (fs instanceof HFileSystem) {
1378         FileSystem noChecksumFs = ((HFileSystem)fs).getNoChecksumFs();
1379         inNoChecksum = hfileLink.open(noChecksumFs);
1380       }
1381 
1382       reader = HFile.createReaderWithEncoding(fs, path, in, inNoChecksum,
1383                   size, cacheConf, preferredEncodingInCache, closeIStream);
1384       bloomFilterType = BloomType.NONE;
1385     }
1386 
1387     /**
1388      * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
1389      */
1390     Reader() {
1391       this.reader = null;
1392     }
1393 
1394     public RawComparator<byte []> getComparator() {
1395       return reader.getComparator();
1396     }
1397 
1398     /**
1399      * Get a scanner to scan over this StoreFile. Do not use
1400      * this overload if using this scanner for compactions.
1401      *
1402      * @param cacheBlocks should this scanner cache blocks?
1403      * @param pread use pread (for highly concurrent small readers)
1404      * @return a scanner
1405      */
1406     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1407                                                boolean pread) {
1408       return getStoreFileScanner(cacheBlocks, pread, false);
1409     }
1410 
1411     /**
1412      * Get a scanner to scan over this StoreFile.
1413      *
1414      * @param cacheBlocks should this scanner cache blocks?
1415      * @param pread use pread (for highly concurrent small readers)
1416      * @param isCompaction is scanner being used for compaction?
1417      * @return a scanner
1418      */
1419     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1420                                                boolean pread,
1421                                                boolean isCompaction) {
1422       return new StoreFileScanner(this,
1423                                  getScanner(cacheBlocks, pread,
1424                                             isCompaction), !isCompaction, reader.hasMVCCInfo());
1425     }
1426 
1427     /**
1428      * Warning: Do not write further code which depends on this call. Instead
1429      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1430      * which is the preferred way to scan a store with higher level concepts.
1431      *
1432      * @param cacheBlocks should we cache the blocks?
1433      * @param pread use pread (for concurrent small readers)
1434      * @return the underlying HFileScanner
1435      */
1436     @Deprecated
1437     public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
1438       return getScanner(cacheBlocks, pread, false);
1439     }
1440 
1441     /**
1442      * Warning: Do not write further code which depends on this call. Instead
1443      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1444      * which is the preferred way to scan a store with higher level concepts.
1445      *
1446      * @param cacheBlocks
1447      *          should we cache the blocks?
1448      * @param pread
1449      *          use pread (for concurrent small readers)
1450      * @param isCompaction
1451      *          is scanner being used for compaction?
1452      * @return the underlying HFileScanner
1453      */
1454     @Deprecated
1455     public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
1456         boolean isCompaction) {
1457       return reader.getScanner(cacheBlocks, pread, isCompaction);
1458     }
1459 
1460     public void close(boolean evictOnClose) throws IOException {
1461       reader.close(evictOnClose);
1462     }
1463 
1464     /**
1465      * Check if this storeFile may contain keys within the TimeRange that
1466      * have not expired (i.e. not older than oldestUnexpiredTS).
1467      * @param scan the current scan
1468      * @param oldestUnexpiredTS the oldest timestamp that is not expired, as
1469      *          determined by the column family's TTL
1470      * @return false if queried keys definitely don't exist in this StoreFile
1471      */
1472     boolean passesTimerangeFilter(Scan scan, long oldestUnexpiredTS) {
1473       if (timeRangeTracker == null) {
1474         return true;
1475       } else {
1476         return timeRangeTracker.includesTimeRange(scan.getTimeRange()) &&
1477             timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS;
1478       }
1479     }
1480 
1481     /**
1482      * Checks whether the given scan passes the Bloom filter (if present). Only
1483      * checks Bloom filters for single-row or single-row-column scans. Bloom
1484      * filter checking for multi-gets is implemented as part of the store
1485      * scanner system (see {@link StoreFileScanner#seekExactly}) and uses
1486      * the lower-level API {@link #passesGeneralBloomFilter(byte[], int, int, byte[],
1487      * int, int)}.
1488      *
1489      * @param scan the scan specification. Used to determine the row, and to
1490      *          check whether this is a single-row ("get") scan.
1491      * @param columns the set of columns. Only used for row-column Bloom
1492      *          filters.
1493      * @return true if the scan with the given column set passes the Bloom
1494      *         filter, or if the Bloom filter is not applicable for the scan.
1495      *         False if the Bloom filter is applicable and the scan fails it.
1496      */
1497      boolean passesBloomFilter(Scan scan,
1498         final SortedSet<byte[]> columns) {
1499       // Multi-column non-get scans will use Bloom filters through the
1500       // lower-level API function that this function calls.
1501       if (!scan.isGetScan()) {
1502         return true;
1503       }
1504 
1505       byte[] row = scan.getStartRow();
1506       switch (this.bloomFilterType) {
1507         case ROW:
1508           return passesGeneralBloomFilter(row, 0, row.length, null, 0, 0);
1509 
1510         case ROWCOL:
1511           if (columns != null && columns.size() == 1) {
1512             byte[] column = columns.first();
1513             return passesGeneralBloomFilter(row, 0, row.length, column, 0,
1514                 column.length);
1515           }
1516 
1517           // For multi-column queries the Bloom filter is checked from the
1518           // seekExact operation.
1519           return true;
1520 
1521         default:
1522           return true;
1523       }
1524     }
1525 
1526     public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
1527         int rowLen) {
1528       // Cache Bloom filter as a local variable in case it is set to null by
1529       // another thread on an IO error.
1530       BloomFilter bloomFilter = this.deleteFamilyBloomFilter;
1531 
1532       // Empty file or there is no delete family at all
1533       if (reader.getTrailer().getEntryCount() == 0 || deleteFamilyCnt == 0) {
1534         return false;
1535       }
1536 
1537       if (bloomFilter == null) {
1538         return true;
1539       }
1540 
1541       try {
1542         if (!bloomFilter.supportsAutoLoading()) {
1543           return true;
1544         }
1545         return bloomFilter.contains(row, rowOffset, rowLen, null);
1546       } catch (IllegalArgumentException e) {
1547         LOG.error("Bad Delete Family bloom filter data -- proceeding without",
1548             e);
1549         setDeleteFamilyBloomFilterFaulty();
1550       }
1551 
1552       return true;
1553     }
1554 
1555     /**
1556      * A method for checking Bloom filters. Called directly from
1557      * StoreFileScanner in case of a multi-column query.
1558      *
1559      * @param row
1560      * @param rowOffset
1561      * @param rowLen
1562      * @param col
1563      * @param colOffset
1564      * @param colLen
1565      * @return True if passes
1566      */
1567     public boolean passesGeneralBloomFilter(byte[] row, int rowOffset,
1568         int rowLen, byte[] col, int colOffset, int colLen) {
1569       if (generalBloomFilter == null)
1570         return true;
1571 
1572       byte[] key;
1573       switch (bloomFilterType) {
1574         case ROW:
1575           if (col != null) {
1576             throw new RuntimeException("Row-only Bloom filter called with " +
1577                 "column specified");
1578           }
1579           if (rowOffset != 0 || rowLen != row.length) {
1580               throw new AssertionError("For row-only Bloom filters the row "
1581                   + "must occupy the whole array");
1582           }
1583           key = row;
1584           break;
1585 
1586         case ROWCOL:
1587           key = generalBloomFilter.createBloomKey(row, rowOffset, rowLen, col,
1588               colOffset, colLen);
1589           break;
1590 
1591         default:
1592           return true;
1593       }
1594 
1595       // Cache Bloom filter as a local variable in case it is set to null by
1596       // another thread on an IO error.
1597       BloomFilter bloomFilter = this.generalBloomFilter;
1598 
1599       if (bloomFilter == null) {
1600         return true;
1601       }
1602 
1603       // Empty file
1604       if (reader.getTrailer().getEntryCount() == 0)
1605         return false;
1606 
1607       try {
1608         boolean shouldCheckBloom;
1609         ByteBuffer bloom;
1610         if (bloomFilter.supportsAutoLoading()) {
1611           bloom = null;
1612           shouldCheckBloom = true;
1613         } else {
1614           bloom = reader.getMetaBlock(HFileWriterV1.BLOOM_FILTER_DATA_KEY,
1615               true);
1616           shouldCheckBloom = bloom != null;
1617         }
1618 
1619         if (shouldCheckBloom) {
1620           boolean exists;
1621 
1622           // Whether the primary Bloom key is greater than the last Bloom key
1623           // from the file info. For row-column Bloom filters this is not yet
1624           // a sufficient condition to return false.
1625           boolean keyIsAfterLast = lastBloomKey != null
1626               && bloomFilter.getComparator().compare(key, lastBloomKey) > 0;
1627 
1628           if (bloomFilterType == BloomType.ROWCOL) {
1629             // Since a Row Delete is essentially a DeleteFamily applied to all
1630             // columns, a file might be skipped if using row+col Bloom filter.
1631             // In order to ensure this file is included an additional check is
1632             // required looking only for a row bloom.
1633             byte[] rowBloomKey = bloomFilter.createBloomKey(row, 0, row.length,
1634                 null, 0, 0);
1635 
1636             if (keyIsAfterLast
1637                 && bloomFilter.getComparator().compare(rowBloomKey,
1638                     lastBloomKey) > 0) {
1639               exists = false;
1640             } else {
1641               exists =
1642                   bloomFilter.contains(key, 0, key.length, bloom) ||
1643                   bloomFilter.contains(rowBloomKey, 0, rowBloomKey.length,
1644                       bloom);
1645             }
1646           } else {
1647             exists = !keyIsAfterLast
1648                 && bloomFilter.contains(key, 0, key.length, bloom);
1649           }
1650 
1651           getSchemaMetrics().updateBloomMetrics(exists);
1652           return exists;
1653         }
1654       } catch (IOException e) {
1655         LOG.error("Error reading bloom filter data -- proceeding without",
1656             e);
1657         setGeneralBloomFilterFaulty();
1658       } catch (IllegalArgumentException e) {
1659         LOG.error("Bad bloom filter data -- proceeding without", e);
1660         setGeneralBloomFilterFaulty();
1661       }
1662 
1663       return true;
1664     }
1665 
1666     /**
1667      * Checks whether the given scan rowkey range overlaps with the current storefile's
1668      * @param scan the scan specification. Used to determine the rowkey range.
1669      * @return true if there is overlap, false otherwise
1670      */
1671     public boolean passesKeyRangeFilter(Scan scan) {
1672       if (this.getFirstKey() == null || this.getLastKey() == null) {
1673         // the file is empty
1674         return false;
1675       }
1676       if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
1677           && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
1678         return true;
1679       }
1680       KeyValue startKeyValue = KeyValue.createFirstOnRow(scan.getStartRow());
1681       KeyValue stopKeyValue = KeyValue.createLastOnRow(scan.getStopRow());
1682       boolean nonOverLapping = (getComparator().compare(this.getFirstKey(),
1683         stopKeyValue.getKey()) > 0 && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))
1684           || getComparator().compare(this.getLastKey(), startKeyValue.getKey()) < 0;
1685       return !nonOverLapping;
1686     }
1687 
1688     public Map<byte[], byte[]> loadFileInfo() throws IOException {
1689       Map<byte [], byte []> fi = reader.loadFileInfo();
1690 
1691       byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
1692       if (b != null) {
1693         bloomFilterType = BloomType.valueOf(Bytes.toString(b));
1694       }
1695 
1696       lastBloomKey = fi.get(LAST_BLOOM_KEY);
1697       byte[] cnt = fi.get(DELETE_FAMILY_COUNT);
1698       if (cnt != null) {
1699         deleteFamilyCnt = Bytes.toLong(cnt);
1700       }
1701 
1702       return fi;
1703     }
1704 
1705     public void loadBloomfilter() {
1706       this.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
1707       this.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
1708     }
1709 
1710     private void loadBloomfilter(BlockType blockType) {
1711       try {
1712         if (blockType == BlockType.GENERAL_BLOOM_META) {
1713           if (this.generalBloomFilter != null)
1714             return; // Bloom has been loaded
1715 
1716           DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
1717           if (bloomMeta != null) {
1718             // sanity check for NONE Bloom filter
1719             if (bloomFilterType == BloomType.NONE) {
1720               throw new IOException(
1721                   "valid bloom filter type not found in FileInfo");
1722             } else {
1723               generalBloomFilter = BloomFilterFactory.createFromMeta(bloomMeta,
1724                   reader);
1725               LOG.info("Loaded " + bloomFilterType.toString() + " ("
1726                   + generalBloomFilter.getClass().getSimpleName()
1727                   + ") metadata for " + reader.getName());
1728             }
1729           }
1730         } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1731           if (this.deleteFamilyBloomFilter != null)
1732             return; // Bloom has been loaded
1733 
1734           DataInput bloomMeta = reader.getDeleteBloomFilterMetadata();
1735           if (bloomMeta != null) {
1736             deleteFamilyBloomFilter = BloomFilterFactory.createFromMeta(
1737                 bloomMeta, reader);
1738             LOG.info("Loaded Delete Family Bloom ("
1739                 + deleteFamilyBloomFilter.getClass().getSimpleName()
1740                 + ") metadata for " + reader.getName());
1741           }
1742         } else {
1743           throw new RuntimeException("Block Type: " + blockType.toString()
1744               + "is not supported for Bloom filter");
1745         }
1746       } catch (IOException e) {
1747         LOG.error("Error reading bloom filter meta for " + blockType
1748             + " -- proceeding without", e);
1749         setBloomFilterFaulty(blockType);
1750       } catch (IllegalArgumentException e) {
1751         LOG.error("Bad bloom filter meta " + blockType
1752             + " -- proceeding without", e);
1753         setBloomFilterFaulty(blockType);
1754       }
1755     }
1756 
1757     private void setBloomFilterFaulty(BlockType blockType) {
1758       if (blockType == BlockType.GENERAL_BLOOM_META) {
1759         setGeneralBloomFilterFaulty();
1760       } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1761         setDeleteFamilyBloomFilterFaulty();
1762       }
1763     }
1764 
1765     /**
1766      * The number of Bloom filter entries in this store file, or an estimate
1767      * thereof, if the Bloom filter is not loaded. This always returns an upper
1768      * bound of the number of Bloom filter entries.
1769      *
1770      * @return an estimate of the number of Bloom filter entries in this file
1771      */
1772     public long getFilterEntries() {
1773       return generalBloomFilter != null ? generalBloomFilter.getKeyCount()
1774           : reader.getEntries();
1775     }
1776 
1777     public void setGeneralBloomFilterFaulty() {
1778       generalBloomFilter = null;
1779     }
1780 
1781     public void setDeleteFamilyBloomFilterFaulty() {
1782       this.deleteFamilyBloomFilter = null;
1783     }
1784 
1785     public byte[] getLastKey() {
1786       return reader.getLastKey();
1787     }
1788 
1789     public byte[] midkey() throws IOException {
1790       return reader.midkey();
1791     }
1792 
1793     public long length() {
1794       return reader.length();
1795     }
1796 
1797     public long getTotalUncompressedBytes() {
1798       return reader.getTrailer().getTotalUncompressedBytes();
1799     }
1800 
1801     public long getEntries() {
1802       return reader.getEntries();
1803     }
1804 
1805     public long getDeleteFamilyCnt() {
1806       return deleteFamilyCnt;
1807     }
1808 
1809     public byte[] getFirstKey() {
1810       return reader.getFirstKey();
1811     }
1812 
1813     public long indexSize() {
1814       return reader.indexSize();
1815     }
1816 
1817     public String getColumnFamilyName() {
1818       return reader.getColumnFamilyName();
1819     }
1820 
1821     public BloomType getBloomFilterType() {
1822       return this.bloomFilterType;
1823     }
1824 
1825     public long getSequenceID() {
1826       return sequenceID;
1827     }
1828 
1829     public void setSequenceID(long sequenceID) {
1830       this.sequenceID = sequenceID;
1831     }
1832 
1833     BloomFilter getGeneralBloomFilter() {
1834       return generalBloomFilter;
1835     }
1836 
1837     long getUncompressedDataIndexSize() {
1838       return reader.getTrailer().getUncompressedDataIndexSize();
1839     }
1840 
1841     public long getTotalBloomSize() {
1842       if (generalBloomFilter == null)
1843         return 0;
1844       return generalBloomFilter.getByteSize();
1845     }
1846 
1847     public int getHFileVersion() {
1848       return reader.getTrailer().getMajorVersion();
1849     }
1850 
1851     HFile.Reader getHFileReader() {
1852       return reader;
1853     }
1854 
1855     void disableBloomFilterForTesting() {
1856       generalBloomFilter = null;
1857       this.deleteFamilyBloomFilter = null;
1858     }
1859 
1860     public long getMaxTimestamp() {
1861       return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.maximumTimestamp;
1862     }
1863 
1864     @Override
1865     public void schemaConfigurationChanged() {
1866       passSchemaMetricsTo((SchemaConfigured) reader);
1867     }
1868   }
1869 
1870   /**
1871    * Useful comparators for comparing StoreFiles.
1872    */
1873   abstract static class Comparators {
1874     /**
1875      * Comparator that compares based on the flush time of
1876      * the StoreFiles. All bulk loads are placed before all non-
1877      * bulk loads, and then all files are sorted by sequence ID.
1878      * Comparator that compares based on the Sequence Ids of the
1879      * the StoreFiles. Bulk loads that did not request a seq ID
1880      * are given a seq id of -1; thus, they are placed before all non-
1881      * bulk loads, and bulk loads with sequence Id. Among these files,
1882      * the bulkLoadTime is used to determine the ordering.
1883      * If there are ties, the path name is used as a tie-breaker. 
1884      */
1885     static final Comparator<StoreFile> SEQ_ID =
1886       Ordering.compound(ImmutableList.of(
1887           Ordering.natural().onResultOf(new GetSeqId()),
1888           Ordering.natural().onResultOf(new GetBulkTime()),
1889           Ordering.natural().onResultOf(new GetPathName())
1890       ));
1891 
1892     private static class GetSeqId implements Function<StoreFile, Long> {
1893       @Override
1894       public Long apply(StoreFile sf) {
1895         return sf.getMaxSequenceId();
1896       }
1897     }
1898 
1899     private static class GetBulkTime implements Function<StoreFile, Long> {
1900       @Override
1901       public Long apply(StoreFile sf) {
1902         if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
1903         return sf.getBulkLoadTimestamp();
1904       }
1905     }
1906 
1907     private static class GetPathName implements Function<StoreFile, String> {
1908       @Override
1909       public String apply(StoreFile sf) {
1910         return sf.getPath().getName();
1911       }
1912     }
1913 
1914     /**
1915      * FILE_SIZE = descending sort StoreFiles (largest --> smallest in size)
1916      */
1917     static final Comparator<StoreFile> FILE_SIZE =
1918       Ordering.natural().reverse().onResultOf(new Function<StoreFile, Long>() {
1919         @Override
1920         public Long apply(StoreFile sf) {
1921           return sf.getReader().length();
1922         }
1923       });
1924   }
1925 }