View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.DataInput;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.nio.ByteBuffer;
25  import java.util.Arrays;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.Comparator;
29  import java.util.Map;
30  import java.util.SortedSet;
31  import java.util.UUID;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.KeyValue.KVComparator;
44  import org.apache.hadoop.hbase.client.Scan;
45  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
46  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
47  import org.apache.hadoop.hbase.io.hfile.BlockType;
48  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
49  import org.apache.hadoop.hbase.io.hfile.HFile;
50  import org.apache.hadoop.hbase.io.hfile.HFileContext;
51  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
52  import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
53  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
54  import org.apache.hadoop.hbase.util.BloomFilter;
55  import org.apache.hadoop.hbase.util.BloomFilterFactory;
56  import org.apache.hadoop.hbase.util.BloomFilterWriter;
57  import org.apache.hadoop.hbase.util.Bytes;
58  import org.apache.hadoop.hbase.util.ChecksumType;
59  import org.apache.hadoop.hbase.util.Writables;
60  import org.apache.hadoop.io.WritableUtils;
61  
62  import com.google.common.base.Function;
63  import com.google.common.base.Preconditions;
64  import com.google.common.collect.ImmutableList;
65  import com.google.common.collect.Ordering;
66  
67  /**
68   * A Store data file.  Stores usually have one or more of these files.  They
69   * are produced by flushing the memstore to disk.  To
70   * create, instantiate a writer using {@link StoreFile.WriterBuilder}
71   * and append data. Be sure to add any metadata before calling close on the
72   * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
73   * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
74   * passing filesystem and path.  To read, call {@link #createReader()}.
75   * <p>StoreFiles may also reference store files in another Store.
76   *
77   * The reason for this weird pattern where you use a different instance for the
78   * writer and a reader is that we write once but read a lot more.
79   */
80  @InterfaceAudience.LimitedPrivate("Coprocessor")
81  public class StoreFile {
82    static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
83  
84    // Keys for fileinfo values in HFile
85  
86    /** Max Sequence ID in FileInfo */
87    public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
88  
89    /** Major compaction flag in FileInfo */
90    public static final byte[] MAJOR_COMPACTION_KEY =
91        Bytes.toBytes("MAJOR_COMPACTION_KEY");
92  
93    /** Minor compaction flag in FileInfo */
94    public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
95        Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
96  
97    /** Bloom filter Type in FileInfo */
98    public static final byte[] BLOOM_FILTER_TYPE_KEY =
99        Bytes.toBytes("BLOOM_FILTER_TYPE");
100 
101   /** Delete Family Count in FileInfo */
102   public static final byte[] DELETE_FAMILY_COUNT =
103       Bytes.toBytes("DELETE_FAMILY_COUNT");
104 
105   /** Last Bloom filter key in FileInfo */
106   private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
107 
108   /** Key for Timerange information in metadata*/
109   public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
110 
111   /** Key for timestamp of earliest-put in metadata*/
112   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
113 
114   private final StoreFileInfo fileInfo;
115   private final FileSystem fs;
116 
117   // Block cache configuration and reference.
118   private final CacheConfig cacheConf;
119 
120   // Keys for metadata stored in backing HFile.
121   // Set when we obtain a Reader.
122   private long sequenceid = -1;
123 
124   // max of the MemstoreTS in the KV's in this store
125   // Set when we obtain a Reader.
126   private long maxMemstoreTS = -1;
127 
128   public long getMaxMemstoreTS() {
129     return maxMemstoreTS;
130   }
131 
132   public void setMaxMemstoreTS(long maxMemstoreTS) {
133     this.maxMemstoreTS = maxMemstoreTS;
134   }
135 
136   // If true, this file was product of a major compaction.  Its then set
137   // whenever you get a Reader.
138   private AtomicBoolean majorCompaction = null;
139 
140   // If true, this file should not be included in minor compactions.
141   // It's set whenever you get a Reader.
142   private boolean excludeFromMinorCompaction = false;
143 
144   /** Meta key set when store file is a result of a bulk load */
145   public static final byte[] BULKLOAD_TASK_KEY =
146     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
147   public static final byte[] BULKLOAD_TIME_KEY =
148     Bytes.toBytes("BULKLOAD_TIMESTAMP");
149 
150   /**
151    * Map of the metadata entries in the corresponding HFile
152    */
153   private Map<byte[], byte[]> metadataMap;
154 
155   // StoreFile.Reader
156   private volatile Reader reader;
157 
158   /**
159    * Bloom filter type specified in column family configuration. Does not
160    * necessarily correspond to the Bloom filter type present in the HFile.
161    */
162   private final BloomType cfBloomType;
163 
164   // the last modification time stamp
165   private long modificationTimeStamp = 0L;
166 
167   /**
168    * Constructor, loads a reader and it's indices, etc. May allocate a
169    * substantial amount of ram depending on the underlying files (10-20MB?).
170    *
171    * @param fs  The current file system to use.
172    * @param p  The path of the file.
173    * @param conf  The current configuration.
174    * @param cacheConf  The cache configuration and block cache reference.
175    * @param cfBloomType The bloom type to use for this store file as specified
176    *          by column family configuration. This may or may not be the same
177    *          as the Bloom filter type actually present in the HFile, because
178    *          column family configuration might change. If this is
179    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
180    * @throws IOException When opening the reader fails.
181    */
182   public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
183         final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
184     this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType);
185   }
186 
187 
188   /**
189    * Constructor, loads a reader and it's indices, etc. May allocate a
190    * substantial amount of ram depending on the underlying files (10-20MB?).
191    *
192    * @param fs  The current file system to use.
193    * @param fileInfo  The store file information.
194    * @param conf  The current configuration.
195    * @param cacheConf  The cache configuration and block cache reference.
196    * @param cfBloomType The bloom type to use for this store file as specified
197    *          by column family configuration. This may or may not be the same
198    *          as the Bloom filter type actually present in the HFile, because
199    *          column family configuration might change. If this is
200    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
201    * @throws IOException When opening the reader fails.
202    */
203   public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
204       final CacheConfig cacheConf,  final BloomType cfBloomType) throws IOException {
205     this.fs = fs;
206     this.fileInfo = fileInfo;
207     this.cacheConf = cacheConf;
208 
209     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
210       this.cfBloomType = cfBloomType;
211     } else {
212       LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
213           "cfBloomType=" + cfBloomType + " (disabled in config)");
214       this.cfBloomType = BloomType.NONE;
215     }
216 
217     // cache the modification time stamp of this store file
218     this.modificationTimeStamp = fileInfo.getModificationTime();
219   }
220 
221   /**
222    * @return the StoreFile object associated to this StoreFile.
223    *         null if the StoreFile is not a reference.
224    */
225   StoreFileInfo getFileInfo() {
226     return this.fileInfo;
227   }
228 
229   /**
230    * @return Path or null if this StoreFile was made with a Stream.
231    */
232   public Path getPath() {
233     return this.fileInfo.getPath();
234   }
235 
236   /**
237    * @return True if this is a StoreFile Reference; call after {@link #open()}
238    * else may get wrong answer.
239    */
240   public boolean isReference() {
241     return this.fileInfo.isReference();
242   }
243 
244   /**
245    * @return True if this file was made by a major compaction.
246    */
247   public boolean isMajorCompaction() {
248     if (this.majorCompaction == null) {
249       throw new NullPointerException("This has not been set yet");
250     }
251     return this.majorCompaction.get();
252   }
253 
254   /**
255    * @return True if this file should not be part of a minor compaction.
256    */
257   public boolean excludeFromMinorCompaction() {
258     return this.excludeFromMinorCompaction;
259   }
260 
261   /**
262    * @return This files maximum edit sequence id.
263    */
264   public long getMaxSequenceId() {
265     return this.sequenceid;
266   }
267 
268   public long getModificationTimeStamp() {
269     return modificationTimeStamp;
270   }
271 
272   public byte[] getMetadataValue(byte[] key) {
273     return metadataMap.get(key);
274   }
275 
276   /**
277    * Return the largest memstoreTS found across all storefiles in
278    * the given list. Store files that were created by a mapreduce
279    * bulk load are ignored, as they do not correspond to any specific
280    * put operation, and thus do not have a memstoreTS associated with them.
281    * @return 0 if no non-bulk-load files are provided or, this is Store that
282    * does not yet have any store files.
283    */
284   public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) {
285     long max = 0;
286     for (StoreFile sf : sfs) {
287       if (!sf.isBulkLoadResult()) {
288         max = Math.max(max, sf.getMaxMemstoreTS());
289       }
290     }
291     return max;
292   }
293 
294   /**
295    * Return the highest sequence ID found across all storefiles in
296    * the given list. Store files that were created by a mapreduce
297    * bulk load are ignored, as they do not correspond to any edit
298    * log items.
299    * @param sfs
300    * @param includeBulkLoadedFiles
301    * @return 0 if no non-bulk-load files are provided or, this is Store that
302    * does not yet have any store files.
303    */
304   public static long getMaxSequenceIdInList(Collection<StoreFile> sfs,
305       boolean includeBulkLoadedFiles) {
306     long max = 0;
307     for (StoreFile sf : sfs) {
308       if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) {
309         max = Math.max(max, sf.getMaxSequenceId());
310       }
311     }
312     return max;
313   }
314 
315   /**
316    * @return true if this storefile was created by HFileOutputFormat
317    * for a bulk load.
318    */
319   boolean isBulkLoadResult() {
320     return metadataMap.containsKey(BULKLOAD_TIME_KEY);
321   }
322 
323   /**
324    * Return the timestamp at which this bulk load file was generated.
325    */
326   public long getBulkLoadTimestamp() {
327     return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY));
328   }
329 
330   /**
331    * @return the cached value of HDFS blocks distribution. The cached value is
332    * calculated when store file is opened.
333    */
334   public HDFSBlocksDistribution getHDFSBlockDistribution() {
335     return this.fileInfo.getHDFSBlockDistribution();
336   }
337 
338   /**
339    * Opens reader on this store file.  Called by Constructor.
340    * @return Reader for the store file.
341    * @throws IOException
342    * @see #closeReader(boolean)
343    */
344   private Reader open() throws IOException {
345     if (this.reader != null) {
346       throw new IllegalAccessError("Already open");
347     }
348 
349     // Open the StoreFile.Reader
350     this.reader = fileInfo.open(this.fs, this.cacheConf);
351 
352     // Load up indices and fileinfo. This also loads Bloom filter type.
353     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
354 
355     // Read in our metadata.
356     byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
357     if (b != null) {
358       // By convention, if halfhfile, top half has a sequence number > bottom
359       // half. Thats why we add one in below. Its done for case the two halves
360       // are ever merged back together --rare.  Without it, on open of store,
361       // since store files are distinguished by sequence id, the one half would
362       // subsume the other.
363       this.sequenceid = Bytes.toLong(b);
364       if (fileInfo.isTopReference()) {
365         this.sequenceid += 1;
366       }
367     }
368 
369     if (isBulkLoadResult()){
370       // generate the sequenceId from the fileName
371       // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
372       String fileName = this.getPath().getName();
373       int startPos = fileName.indexOf("SeqId_");
374       if (startPos != -1) {
375         this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
376             fileName.indexOf('_', startPos + 6)));
377         // Handle reference files as done above.
378         if (fileInfo.isTopReference()) {
379           this.sequenceid += 1;
380         }
381       }
382     }
383     this.reader.setSequenceID(this.sequenceid);
384 
385     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
386     if (b != null) {
387       this.maxMemstoreTS = Bytes.toLong(b);
388     }
389 
390     b = metadataMap.get(MAJOR_COMPACTION_KEY);
391     if (b != null) {
392       boolean mc = Bytes.toBoolean(b);
393       if (this.majorCompaction == null) {
394         this.majorCompaction = new AtomicBoolean(mc);
395       } else {
396         this.majorCompaction.set(mc);
397       }
398     } else {
399       // Presume it is not major compacted if it doesn't explicity say so
400       // HFileOutputFormat explicitly sets the major compacted key.
401       this.majorCompaction = new AtomicBoolean(false);
402     }
403 
404     b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
405     this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
406 
407     BloomType hfileBloomType = reader.getBloomFilterType();
408     if (cfBloomType != BloomType.NONE) {
409       reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
410       if (hfileBloomType != cfBloomType) {
411         LOG.info("HFile Bloom filter type for "
412             + reader.getHFileReader().getName() + ": " + hfileBloomType
413             + ", but " + cfBloomType + " specified in column family "
414             + "configuration");
415       }
416     } else if (hfileBloomType != BloomType.NONE) {
417       LOG.info("Bloom filter turned off by CF config for "
418           + reader.getHFileReader().getName());
419     }
420 
421     // load delete family bloom filter
422     reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
423 
424     try {
425       byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
426       if (timerangeBytes != null) {
427         this.reader.timeRangeTracker = new TimeRangeTracker();
428         Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
429       }
430     } catch (IllegalArgumentException e) {
431       LOG.error("Error reading timestamp range data from meta -- " +
432           "proceeding without", e);
433       this.reader.timeRangeTracker = null;
434     }
435     return this.reader;
436   }
437 
438   /**
439    * @return Reader for StoreFile. creates if necessary
440    * @throws IOException
441    */
442   public Reader createReader() throws IOException {
443     if (this.reader == null) {
444       try {
445         this.reader = open();
446       } catch (IOException e) {
447         try {
448           this.closeReader(true);
449         } catch (IOException ee) {
450         }
451         throw e;
452       }
453 
454     }
455     return this.reader;
456   }
457 
458   /**
459    * @return Current reader.  Must call createReader first else returns null.
460    * @see #createReader()
461    */
462   public Reader getReader() {
463     return this.reader;
464   }
465 
466   /**
467    * @param evictOnClose whether to evict blocks belonging to this file
468    * @throws IOException
469    */
470   public synchronized void closeReader(boolean evictOnClose)
471       throws IOException {
472     if (this.reader != null) {
473       this.reader.close(evictOnClose);
474       this.reader = null;
475     }
476   }
477 
478   /**
479    * Delete this file
480    * @throws IOException
481    */
482   public void deleteReader() throws IOException {
483     closeReader(true);
484     this.fs.delete(getPath(), true);
485   }
486 
487   @Override
488   public String toString() {
489     return this.fileInfo.toString();
490   }
491 
492   /**
493    * @return a length description of this StoreFile, suitable for debug output
494    */
495   public String toStringDetailed() {
496     StringBuilder sb = new StringBuilder();
497     sb.append(this.getPath().toString());
498     sb.append(", isReference=").append(isReference());
499     sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
500     if (isBulkLoadResult()) {
501       sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
502     } else {
503       sb.append(", seqid=").append(getMaxSequenceId());
504     }
505     sb.append(", majorCompaction=").append(isMajorCompaction());
506 
507     return sb.toString();
508   }
509 
510   public static class WriterBuilder {
511     private final Configuration conf;
512     private final CacheConfig cacheConf;
513     private final FileSystem fs;
514 
515     private KeyValue.KVComparator comparator = KeyValue.COMPARATOR;
516     private BloomType bloomType = BloomType.NONE;
517     private long maxKeyCount = 0;
518     private Path dir;
519     private Path filePath;
520     private InetSocketAddress[] favoredNodes;
521     private HFileContext fileContext;
522     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
523         FileSystem fs) {
524       this.conf = conf;
525       this.cacheConf = cacheConf;
526       this.fs = fs;
527     }
528 
529     /**
530      * Use either this method or {@link #withFilePath}, but not both.
531      * @param dir Path to column family directory. The directory is created if
532      *          does not exist. The file is given a unique name within this
533      *          directory.
534      * @return this (for chained invocation)
535      */
536     public WriterBuilder withOutputDir(Path dir) {
537       Preconditions.checkNotNull(dir);
538       this.dir = dir;
539       return this;
540     }
541 
542     /**
543      * Use either this method or {@link #withOutputDir}, but not both.
544      * @param filePath the StoreFile path to write
545      * @return this (for chained invocation)
546      */
547     public WriterBuilder withFilePath(Path filePath) {
548       Preconditions.checkNotNull(filePath);
549       this.filePath = filePath;
550       return this;
551     }
552 
553     /**
554      * @param favoredNodes an array of favored nodes or possibly null
555      * @return this (for chained invocation)
556      */
557     public WriterBuilder withFavoredNodes(InetSocketAddress[] favoredNodes) {
558       this.favoredNodes = favoredNodes;
559       return this;
560     }
561 
562     public WriterBuilder withComparator(KeyValue.KVComparator comparator) {
563       Preconditions.checkNotNull(comparator);
564       this.comparator = comparator;
565       return this;
566     }
567 
568     public WriterBuilder withBloomType(BloomType bloomType) {
569       Preconditions.checkNotNull(bloomType);
570       this.bloomType = bloomType;
571       return this;
572     }
573 
574     /**
575      * @param maxKeyCount estimated maximum number of keys we expect to add
576      * @return this (for chained invocation)
577      */
578     public WriterBuilder withMaxKeyCount(long maxKeyCount) {
579       this.maxKeyCount = maxKeyCount;
580       return this;
581     }
582 
583     public WriterBuilder withFileContext(HFileContext fileContext) {
584       this.fileContext = fileContext;
585       return this;
586     }
587     /**
588      * Create a store file writer. Client is responsible for closing file when
589      * done. If metadata, add BEFORE closing using
590      * {@link Writer#appendMetadata}.
591      */
592     public Writer build() throws IOException {
593       if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
594         throw new IllegalArgumentException("Either specify parent directory " +
595             "or file path");
596       }
597 
598       if (dir == null) {
599         dir = filePath.getParent();
600       }
601 
602       if (!fs.exists(dir)) {
603         fs.mkdirs(dir);
604       }
605 
606       if (filePath == null) {
607         filePath = getUniqueFile(fs, dir);
608         if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
609           bloomType = BloomType.NONE;
610         }
611       }
612 
613       if (comparator == null) {
614         comparator = KeyValue.COMPARATOR;
615       }
616       return new Writer(fs, filePath, 
617           conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext);
618     }
619   }
620 
621   /**
622    * @param fs
623    * @param dir Directory to create file in.
624    * @return random filename inside passed <code>dir</code>
625    */
626   public static Path getUniqueFile(final FileSystem fs, final Path dir)
627       throws IOException {
628     if (!fs.getFileStatus(dir).isDir()) {
629       throw new IOException("Expecting " + dir.toString() +
630         " to be a directory");
631     }
632     return new Path(dir, UUID.randomUUID().toString().replaceAll("-", ""));
633   }
634 
635   public Long getMinimumTimestamp() {
636     return (getReader().timeRangeTracker == null) ?
637         null :
638         getReader().timeRangeTracker.getMinimumTimestamp();
639   }
640 
641   /**
642    * Gets the approximate mid-point of this file that is optimal for use in splitting it.
643    * @param comparator Comparator used to compare KVs.
644    * @return The split point row, or null if splitting is not possible, or reader is null.
645    */
646   @SuppressWarnings("deprecation")
647   byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
648     if (this.reader == null) {
649       LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
650       return null;
651     }
652     // Get first, last, and mid keys.  Midkey is the key that starts block
653     // in middle of hfile.  Has column and timestamp.  Need to return just
654     // the row we want to split on as midkey.
655     byte [] midkey = this.reader.midkey();
656     if (midkey != null) {
657       KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
658       byte [] fk = this.reader.getFirstKey();
659       KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
660       byte [] lk = this.reader.getLastKey();
661       KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
662       // if the midkey is the same as the first or last keys, we cannot (ever) split this region.
663       if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
664         if (LOG.isDebugEnabled()) {
665           LOG.debug("cannot split because midkey is the same as first or last row");
666         }
667         return null;
668       }
669       return mk.getRow();
670     }
671     return null;
672   }
673 
674   /**
675    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
676    * local because it is an implementation detail of the HBase regionserver.
677    */
678   public static class Writer implements Compactor.CellSink {
679     private final BloomFilterWriter generalBloomFilterWriter;
680     private final BloomFilterWriter deleteFamilyBloomFilterWriter;
681     private final BloomType bloomType;
682     private byte[] lastBloomKey;
683     private int lastBloomKeyOffset, lastBloomKeyLen;
684     private KVComparator kvComparator;
685     private KeyValue lastKv = null;
686     private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
687     private KeyValue lastDeleteFamilyKV = null;
688     private long deleteFamilyCnt = 0;
689 
690 
691     /** Checksum type */
692     protected ChecksumType checksumType;
693 
694     /** Bytes per Checksum */
695     protected int bytesPerChecksum;
696     
697     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
698     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
699      * When flushing a memstore, we set TimeRange and use this variable to
700      * indicate that it doesn't need to be calculated again while
701      * appending KeyValues.
702      * It is not set in cases of compactions when it is recalculated using only
703      * the appended KeyValues*/
704     boolean isTimeRangeTrackerSet = false;
705 
706     protected HFile.Writer writer;
707 
708     /**
709      * Creates an HFile.Writer that also write helpful meta data.
710      * @param fs file system to write to
711      * @param path file name to create
712      * @param conf user configuration
713      * @param comparator key comparator
714      * @param bloomType bloom filter setting
715      * @param maxKeys the expected maximum number of keys to be added. Was used
716      *        for Bloom filter size in {@link HFile} format version 1.
717      * @param favoredNodes
718      * @param fileContext - The HFile context
719      * @throws IOException problem writing to FS
720      */
721     private Writer(FileSystem fs, Path path,
722         final Configuration conf,
723         CacheConfig cacheConf,
724         final KVComparator comparator, BloomType bloomType, long maxKeys,
725         InetSocketAddress[] favoredNodes, HFileContext fileContext) 
726             throws IOException {
727       writer = HFile.getWriterFactory(conf, cacheConf)
728           .withPath(fs, path)
729           .withComparator(comparator)
730           .withFavoredNodes(favoredNodes)
731           .withFileContext(fileContext)
732           .create();
733 
734       this.kvComparator = comparator;
735 
736       generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
737           conf, cacheConf, bloomType,
738           (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
739 
740       if (generalBloomFilterWriter != null) {
741         this.bloomType = bloomType;
742         if (LOG.isTraceEnabled()) LOG.trace("Bloom filter type for " + path + ": " +
743           this.bloomType + ", " + generalBloomFilterWriter.getClass().getSimpleName());
744       } else {
745         // Not using Bloom filters.
746         this.bloomType = BloomType.NONE;
747       }
748 
749       // initialize delete family Bloom filter when there is NO RowCol Bloom
750       // filter
751       if (this.bloomType != BloomType.ROWCOL) {
752         this.deleteFamilyBloomFilterWriter = BloomFilterFactory
753             .createDeleteBloomAtWrite(conf, cacheConf,
754                 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
755       } else {
756         deleteFamilyBloomFilterWriter = null;
757       }
758       if (deleteFamilyBloomFilterWriter != null) {
759         if (LOG.isTraceEnabled()) LOG.trace("Delete Family Bloom filter type for " + path + ": "
760             + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
761       }
762     }
763 
764     /**
765      * Writes meta data.
766      * Call before {@link #close()} since its written as meta data to this file.
767      * @param maxSequenceId Maximum sequence id.
768      * @param majorCompaction True if this file is product of a major compaction
769      * @throws IOException problem writing to FS
770      */
771     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
772     throws IOException {
773       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
774       writer.appendFileInfo(MAJOR_COMPACTION_KEY,
775           Bytes.toBytes(majorCompaction));
776       appendTrackedTimestampsToMetadata();
777     }
778 
779     /**
780      * Add TimestampRange and earliest put timestamp to Metadata
781      */
782     public void appendTrackedTimestampsToMetadata() throws IOException {
783       appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
784       appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
785     }
786 
787     /**
788      * Set TimeRangeTracker
789      * @param trt
790      */
791     public void setTimeRangeTracker(final TimeRangeTracker trt) {
792       this.timeRangeTracker = trt;
793       isTimeRangeTrackerSet = true;
794     }
795 
796     /**
797      * Record the earlest Put timestamp.
798      *
799      * If the timeRangeTracker is not set,
800      * update TimeRangeTracker to include the timestamp of this key
801      * @param kv
802      */
803     public void trackTimestamps(final KeyValue kv) {
804       if (KeyValue.Type.Put.getCode() == kv.getTypeByte()) {
805         earliestPutTs = Math.min(earliestPutTs, kv.getTimestamp());
806       }
807       if (!isTimeRangeTrackerSet) {
808         timeRangeTracker.includeTimestamp(kv);
809       }
810     }
811 
812     private void appendGeneralBloomfilter(final KeyValue kv) throws IOException {
813       if (this.generalBloomFilterWriter != null) {
814         // only add to the bloom filter on a new, unique key
815         boolean newKey = true;
816         if (this.lastKv != null) {
817           switch(bloomType) {
818           case ROW:
819             newKey = ! kvComparator.matchingRows(kv, lastKv);
820             break;
821           case ROWCOL:
822             newKey = ! kvComparator.matchingRowColumn(kv, lastKv);
823             break;
824           case NONE:
825             newKey = false;
826             break;
827           default:
828             throw new IOException("Invalid Bloom filter type: " + bloomType +
829                 " (ROW or ROWCOL expected)");
830           }
831         }
832         if (newKey) {
833           /*
834            * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
835            * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
836            *
837            * 2 Types of Filtering:
838            *  1. Row = Row
839            *  2. RowCol = Row + Qualifier
840            */
841           byte[] bloomKey;
842           int bloomKeyOffset, bloomKeyLen;
843 
844           switch (bloomType) {
845           case ROW:
846             bloomKey = kv.getRowArray();
847             bloomKeyOffset = kv.getRowOffset();
848             bloomKeyLen = kv.getRowLength();
849             break;
850           case ROWCOL:
851             // merge(row, qualifier)
852             // TODO: could save one buffer copy in case of compound Bloom
853             // filters when this involves creating a KeyValue
854             bloomKey = generalBloomFilterWriter.createBloomKey(kv.getRowArray(),
855                 kv.getRowOffset(), kv.getRowLength(), kv.getQualifierArray(),
856                 kv.getQualifierOffset(), kv.getQualifierLength());
857             bloomKeyOffset = 0;
858             bloomKeyLen = bloomKey.length;
859             break;
860           default:
861             throw new IOException("Invalid Bloom filter type: " + bloomType +
862                 " (ROW or ROWCOL expected)");
863           }
864           generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
865           if (lastBloomKey != null
866               && generalBloomFilterWriter.getComparator().compareFlatKey(bloomKey,
867                   bloomKeyOffset, bloomKeyLen, lastBloomKey,
868                   lastBloomKeyOffset, lastBloomKeyLen) <= 0) {
869             throw new IOException("Non-increasing Bloom keys: "
870                 + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen)
871                 + " after "
872                 + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset,
873                     lastBloomKeyLen));
874           }
875           lastBloomKey = bloomKey;
876           lastBloomKeyOffset = bloomKeyOffset;
877           lastBloomKeyLen = bloomKeyLen;
878           this.lastKv = kv;
879         }
880       }
881     }
882 
883     private void appendDeleteFamilyBloomFilter(final KeyValue kv)
884         throws IOException {
885       if (!kv.isDeleteFamily() && !kv.isDeleteFamilyVersion()) {
886         return;
887       }
888 
889       // increase the number of delete family in the store file
890       deleteFamilyCnt++;
891       if (null != this.deleteFamilyBloomFilterWriter) {
892         boolean newKey = true;
893         if (lastDeleteFamilyKV != null) {
894           newKey = !kvComparator.matchingRows(kv, lastDeleteFamilyKV);
895         }
896         if (newKey) {
897           this.deleteFamilyBloomFilterWriter.add(kv.getRowArray(),
898               kv.getRowOffset(), kv.getRowLength());
899           this.lastDeleteFamilyKV = kv;
900         }
901       }
902     }
903 
904     public void append(final KeyValue kv) throws IOException {
905       appendGeneralBloomfilter(kv);
906       appendDeleteFamilyBloomFilter(kv);
907       writer.append(kv);
908       trackTimestamps(kv);
909     }
910 
911     public Path getPath() {
912       return this.writer.getPath();
913     }
914 
915     boolean hasGeneralBloom() {
916       return this.generalBloomFilterWriter != null;
917     }
918 
919     /**
920      * For unit testing only.
921      *
922      * @return the Bloom filter used by this writer.
923      */
924     BloomFilterWriter getGeneralBloomWriter() {
925       return generalBloomFilterWriter;
926     }
927 
928     private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
929       boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
930       if (haveBloom) {
931         bfw.compactBloom();
932       }
933       return haveBloom;
934     }
935 
936     private boolean closeGeneralBloomFilter() throws IOException {
937       boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
938 
939       // add the general Bloom filter writer and append file info
940       if (hasGeneralBloom) {
941         writer.addGeneralBloomFilter(generalBloomFilterWriter);
942         writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY,
943             Bytes.toBytes(bloomType.toString()));
944         if (lastBloomKey != null) {
945           writer.appendFileInfo(LAST_BLOOM_KEY, Arrays.copyOfRange(
946               lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
947                   + lastBloomKeyLen));
948         }
949       }
950       return hasGeneralBloom;
951     }
952 
953     private boolean closeDeleteFamilyBloomFilter() throws IOException {
954       boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
955 
956       // add the delete family Bloom filter writer
957       if (hasDeleteFamilyBloom) {
958         writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
959       }
960 
961       // append file info about the number of delete family kvs
962       // even if there is no delete family Bloom.
963       writer.appendFileInfo(DELETE_FAMILY_COUNT,
964           Bytes.toBytes(this.deleteFamilyCnt));
965 
966       return hasDeleteFamilyBloom;
967     }
968 
969     public void close() throws IOException {
970       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
971       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
972 
973       writer.close();
974 
975       // Log final Bloom filter statistics. This needs to be done after close()
976       // because compound Bloom filters might be finalized as part of closing.
977       if (StoreFile.LOG.isTraceEnabled()) {
978         StoreFile.LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
979           (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
980           getPath());
981       }
982 
983     }
984 
985     public void appendFileInfo(byte[] key, byte[] value) throws IOException {
986       writer.appendFileInfo(key, value);
987     }
988 
989     /** For use in testing, e.g. {@link org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile}
990      */
991     HFile.Writer getHFileWriter() {
992       return writer;
993     }
994   }
995 
996   /**
997    * Reader for a StoreFile.
998    */
999   public static class Reader {
1000     static final Log LOG = LogFactory.getLog(Reader.class.getName());
1001 
1002     protected BloomFilter generalBloomFilter = null;
1003     protected BloomFilter deleteFamilyBloomFilter = null;
1004     protected BloomType bloomFilterType;
1005     private final HFile.Reader reader;
1006     protected TimeRangeTracker timeRangeTracker = null;
1007     protected long sequenceID = -1;
1008     private byte[] lastBloomKey;
1009     private long deleteFamilyCnt = -1;
1010 
1011     public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
1012         throws IOException {
1013       reader = HFile.createReader(fs, path, cacheConf, conf);
1014       bloomFilterType = BloomType.NONE;
1015     }
1016 
1017     public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
1018         CacheConfig cacheConf, Configuration conf) throws IOException {
1019       reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
1020       bloomFilterType = BloomType.NONE;
1021     }
1022 
1023     /**
1024      * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
1025      */
1026     Reader() {
1027       this.reader = null;
1028     }
1029 
1030     public KVComparator getComparator() {
1031       return reader.getComparator();
1032     }
1033 
1034     /**
1035      * Get a scanner to scan over this StoreFile. Do not use
1036      * this overload if using this scanner for compactions.
1037      *
1038      * @param cacheBlocks should this scanner cache blocks?
1039      * @param pread use pread (for highly concurrent small readers)
1040      * @return a scanner
1041      */
1042     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1043                                                boolean pread) {
1044       return getStoreFileScanner(cacheBlocks, pread, false,
1045         // 0 is passed as readpoint because this method is only used by test
1046         // where StoreFile is directly operated upon
1047         0);
1048     }
1049 
1050     /**
1051      * Get a scanner to scan over this StoreFile.
1052      *
1053      * @param cacheBlocks should this scanner cache blocks?
1054      * @param pread use pread (for highly concurrent small readers)
1055      * @param isCompaction is scanner being used for compaction?
1056      * @return a scanner
1057      */
1058     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1059                                                boolean pread,
1060                                                boolean isCompaction, long readPt) {
1061       return new StoreFileScanner(this,
1062                                  getScanner(cacheBlocks, pread, isCompaction),
1063                                  !isCompaction, reader.hasMVCCInfo(), readPt);
1064     }
1065 
1066     /**
1067      * Warning: Do not write further code which depends on this call. Instead
1068      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1069      * which is the preferred way to scan a store with higher level concepts.
1070      *
1071      * @param cacheBlocks should we cache the blocks?
1072      * @param pread use pread (for concurrent small readers)
1073      * @return the underlying HFileScanner
1074      */
1075     @Deprecated
1076     public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
1077       return getScanner(cacheBlocks, pread, false);
1078     }
1079 
1080     /**
1081      * Warning: Do not write further code which depends on this call. Instead
1082      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1083      * which is the preferred way to scan a store with higher level concepts.
1084      *
1085      * @param cacheBlocks
1086      *          should we cache the blocks?
1087      * @param pread
1088      *          use pread (for concurrent small readers)
1089      * @param isCompaction
1090      *          is scanner being used for compaction?
1091      * @return the underlying HFileScanner
1092      */
1093     @Deprecated
1094     public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
1095         boolean isCompaction) {
1096       return reader.getScanner(cacheBlocks, pread, isCompaction);
1097     }
1098 
1099     public void close(boolean evictOnClose) throws IOException {
1100       reader.close(evictOnClose);
1101     }
1102 
1103     /**
1104      * Check if this storeFile may contain keys within the TimeRange that
1105      * have not expired (i.e. not older than oldestUnexpiredTS).
1106      * @param scan the current scan
1107      * @param oldestUnexpiredTS the oldest timestamp that is not expired, as
1108      *          determined by the column family's TTL
1109      * @return false if queried keys definitely don't exist in this StoreFile
1110      */
1111     boolean passesTimerangeFilter(Scan scan, long oldestUnexpiredTS) {
1112       if (timeRangeTracker == null) {
1113         return true;
1114       } else {
1115         return timeRangeTracker.includesTimeRange(scan.getTimeRange()) &&
1116             timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS;
1117       }
1118     }
1119 
1120     /**
1121      * Checks whether the given scan passes the Bloom filter (if present). Only
1122      * checks Bloom filters for single-row or single-row-column scans. Bloom
1123      * filter checking for multi-gets is implemented as part of the store
1124      * scanner system (see {@link StoreFileScanner#seekExactly}) and uses
1125      * the lower-level API {@link #passesGeneralBloomFilter(byte[], int, int, byte[],
1126      * int, int)}.
1127      *
1128      * @param scan the scan specification. Used to determine the row, and to
1129      *          check whether this is a single-row ("get") scan.
1130      * @param columns the set of columns. Only used for row-column Bloom
1131      *          filters.
1132      * @return true if the scan with the given column set passes the Bloom
1133      *         filter, or if the Bloom filter is not applicable for the scan.
1134      *         False if the Bloom filter is applicable and the scan fails it.
1135      */
1136      boolean passesBloomFilter(Scan scan,
1137         final SortedSet<byte[]> columns) {
1138       // Multi-column non-get scans will use Bloom filters through the
1139       // lower-level API function that this function calls.
1140       if (!scan.isGetScan()) {
1141         return true;
1142       }
1143 
1144       byte[] row = scan.getStartRow();
1145       switch (this.bloomFilterType) {
1146         case ROW:
1147           return passesGeneralBloomFilter(row, 0, row.length, null, 0, 0);
1148 
1149         case ROWCOL:
1150           if (columns != null && columns.size() == 1) {
1151             byte[] column = columns.first();
1152             return passesGeneralBloomFilter(row, 0, row.length, column, 0,
1153                 column.length);
1154           }
1155 
1156           // For multi-column queries the Bloom filter is checked from the
1157           // seekExact operation.
1158           return true;
1159 
1160         default:
1161           return true;
1162       }
1163     }
1164 
1165     public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
1166         int rowLen) {
1167       // Cache Bloom filter as a local variable in case it is set to null by
1168       // another thread on an IO error.
1169       BloomFilter bloomFilter = this.deleteFamilyBloomFilter;
1170 
1171       // Empty file or there is no delete family at all
1172       if (reader.getTrailer().getEntryCount() == 0 || deleteFamilyCnt == 0) {
1173         return false;
1174       }
1175 
1176       if (bloomFilter == null) {
1177         return true;
1178       }
1179 
1180       try {
1181         if (!bloomFilter.supportsAutoLoading()) {
1182           return true;
1183         }
1184         return bloomFilter.contains(row, rowOffset, rowLen, null);
1185       } catch (IllegalArgumentException e) {
1186         LOG.error("Bad Delete Family bloom filter data -- proceeding without",
1187             e);
1188         setDeleteFamilyBloomFilterFaulty();
1189       }
1190 
1191       return true;
1192     }
1193 
1194     /**
1195      * A method for checking Bloom filters. Called directly from
1196      * StoreFileScanner in case of a multi-column query.
1197      *
1198      * @param row
1199      * @param rowOffset
1200      * @param rowLen
1201      * @param col
1202      * @param colOffset
1203      * @param colLen
1204      * @return True if passes
1205      */
1206     public boolean passesGeneralBloomFilter(byte[] row, int rowOffset,
1207         int rowLen, byte[] col, int colOffset, int colLen) {
1208       // Cache Bloom filter as a local variable in case it is set to null by
1209       // another thread on an IO error.
1210       BloomFilter bloomFilter = this.generalBloomFilter;
1211       if (bloomFilter == null) {
1212         return true;
1213       }
1214 
1215       byte[] key;
1216       switch (bloomFilterType) {
1217         case ROW:
1218           if (col != null) {
1219             throw new RuntimeException("Row-only Bloom filter called with " +
1220                 "column specified");
1221           }
1222           if (rowOffset != 0 || rowLen != row.length) {
1223               throw new AssertionError("For row-only Bloom filters the row "
1224                   + "must occupy the whole array");
1225           }
1226           key = row;
1227           break;
1228 
1229         case ROWCOL:
1230           key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
1231               colOffset, colLen);
1232           break;
1233 
1234         default:
1235           return true;
1236       }
1237 
1238       // Empty file
1239       if (reader.getTrailer().getEntryCount() == 0)
1240         return false;
1241 
1242       try {
1243         boolean shouldCheckBloom;
1244         ByteBuffer bloom;
1245         if (bloomFilter.supportsAutoLoading()) {
1246           bloom = null;
1247           shouldCheckBloom = true;
1248         } else {
1249           bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY,
1250               true);
1251           shouldCheckBloom = bloom != null;
1252         }
1253 
1254         if (shouldCheckBloom) {
1255           boolean exists;
1256 
1257           // Whether the primary Bloom key is greater than the last Bloom key
1258           // from the file info. For row-column Bloom filters this is not yet
1259           // a sufficient condition to return false.
1260           boolean keyIsAfterLast = lastBloomKey != null
1261               && bloomFilter.getComparator().compareFlatKey(key, lastBloomKey) > 0;
1262 
1263           if (bloomFilterType == BloomType.ROWCOL) {
1264             // Since a Row Delete is essentially a DeleteFamily applied to all
1265             // columns, a file might be skipped if using row+col Bloom filter.
1266             // In order to ensure this file is included an additional check is
1267             // required looking only for a row bloom.
1268             byte[] rowBloomKey = bloomFilter.createBloomKey(row, 0, row.length,
1269                 null, 0, 0);
1270 
1271             if (keyIsAfterLast
1272                 && bloomFilter.getComparator().compareFlatKey(rowBloomKey,
1273                     lastBloomKey) > 0) {
1274               exists = false;
1275             } else {
1276               exists =
1277                   bloomFilter.contains(key, 0, key.length, bloom) ||
1278                   bloomFilter.contains(rowBloomKey, 0, rowBloomKey.length,
1279                       bloom);
1280             }
1281           } else {
1282             exists = !keyIsAfterLast
1283                 && bloomFilter.contains(key, 0, key.length, bloom);
1284           }
1285 
1286           return exists;
1287         }
1288       } catch (IOException e) {
1289         LOG.error("Error reading bloom filter data -- proceeding without",
1290             e);
1291         setGeneralBloomFilterFaulty();
1292       } catch (IllegalArgumentException e) {
1293         LOG.error("Bad bloom filter data -- proceeding without", e);
1294         setGeneralBloomFilterFaulty();
1295       }
1296 
1297       return true;
1298     }
1299 
1300     /**
1301      * Checks whether the given scan rowkey range overlaps with the current storefile's
1302      * @param scan the scan specification. Used to determine the rowkey range.
1303      * @return true if there is overlap, false otherwise
1304      */
1305     public boolean passesKeyRangeFilter(Scan scan) {
1306       if (this.getFirstKey() == null || this.getLastKey() == null) {
1307         // the file is empty
1308         return false;
1309       }
1310       if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
1311           && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
1312         return true;
1313       }
1314       KeyValue smallestScanKeyValue = scan.isReversed() ? KeyValue
1315           .createFirstOnRow(scan.getStopRow()) : KeyValue.createFirstOnRow(scan
1316           .getStartRow());
1317       KeyValue largestScanKeyValue = scan.isReversed() ? KeyValue
1318           .createLastOnRow(scan.getStartRow()) : KeyValue.createLastOnRow(scan
1319           .getStopRow());
1320       boolean nonOverLapping = (getComparator().compareFlatKey(
1321           this.getFirstKey(), largestScanKeyValue.getKey()) > 0 && !Bytes
1322           .equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(),
1323               HConstants.EMPTY_END_ROW))
1324           || getComparator().compareFlatKey(this.getLastKey(),
1325               smallestScanKeyValue.getKey()) < 0;
1326       return !nonOverLapping;
1327     }
1328 
1329     public Map<byte[], byte[]> loadFileInfo() throws IOException {
1330       Map<byte [], byte []> fi = reader.loadFileInfo();
1331 
1332       byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
1333       if (b != null) {
1334         bloomFilterType = BloomType.valueOf(Bytes.toString(b));
1335       }
1336 
1337       lastBloomKey = fi.get(LAST_BLOOM_KEY);
1338       byte[] cnt = fi.get(DELETE_FAMILY_COUNT);
1339       if (cnt != null) {
1340         deleteFamilyCnt = Bytes.toLong(cnt);
1341       }
1342 
1343       return fi;
1344     }
1345 
1346     public void loadBloomfilter() {
1347       this.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
1348       this.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
1349     }
1350 
1351     private void loadBloomfilter(BlockType blockType) {
1352       try {
1353         if (blockType == BlockType.GENERAL_BLOOM_META) {
1354           if (this.generalBloomFilter != null)
1355             return; // Bloom has been loaded
1356 
1357           DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
1358           if (bloomMeta != null) {
1359             // sanity check for NONE Bloom filter
1360             if (bloomFilterType == BloomType.NONE) {
1361               throw new IOException(
1362                   "valid bloom filter type not found in FileInfo");
1363             } else {
1364               generalBloomFilter = BloomFilterFactory.createFromMeta(bloomMeta,
1365                   reader);
1366               if (LOG.isTraceEnabled()) {
1367                 LOG.trace("Loaded " + bloomFilterType.toString() + " "
1368                   + generalBloomFilter.getClass().getSimpleName()
1369                   + " metadata for " + reader.getName());
1370               }
1371             }
1372           }
1373         } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1374           if (this.deleteFamilyBloomFilter != null)
1375             return; // Bloom has been loaded
1376 
1377           DataInput bloomMeta = reader.getDeleteBloomFilterMetadata();
1378           if (bloomMeta != null) {
1379             deleteFamilyBloomFilter = BloomFilterFactory.createFromMeta(
1380                 bloomMeta, reader);
1381             LOG.info("Loaded Delete Family Bloom ("
1382                 + deleteFamilyBloomFilter.getClass().getSimpleName()
1383                 + ") metadata for " + reader.getName());
1384           }
1385         } else {
1386           throw new RuntimeException("Block Type: " + blockType.toString()
1387               + "is not supported for Bloom filter");
1388         }
1389       } catch (IOException e) {
1390         LOG.error("Error reading bloom filter meta for " + blockType
1391             + " -- proceeding without", e);
1392         setBloomFilterFaulty(blockType);
1393       } catch (IllegalArgumentException e) {
1394         LOG.error("Bad bloom filter meta " + blockType
1395             + " -- proceeding without", e);
1396         setBloomFilterFaulty(blockType);
1397       }
1398     }
1399 
1400     private void setBloomFilterFaulty(BlockType blockType) {
1401       if (blockType == BlockType.GENERAL_BLOOM_META) {
1402         setGeneralBloomFilterFaulty();
1403       } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1404         setDeleteFamilyBloomFilterFaulty();
1405       }
1406     }
1407 
1408     /**
1409      * The number of Bloom filter entries in this store file, or an estimate
1410      * thereof, if the Bloom filter is not loaded. This always returns an upper
1411      * bound of the number of Bloom filter entries.
1412      *
1413      * @return an estimate of the number of Bloom filter entries in this file
1414      */
1415     public long getFilterEntries() {
1416       return generalBloomFilter != null ? generalBloomFilter.getKeyCount()
1417           : reader.getEntries();
1418     }
1419 
1420     public void setGeneralBloomFilterFaulty() {
1421       generalBloomFilter = null;
1422     }
1423 
1424     public void setDeleteFamilyBloomFilterFaulty() {
1425       this.deleteFamilyBloomFilter = null;
1426     }
1427 
1428     public byte[] getLastKey() {
1429       return reader.getLastKey();
1430     }
1431 
1432     public byte[] getLastRowKey() {
1433       return reader.getLastRowKey();
1434     }
1435 
1436     public byte[] midkey() throws IOException {
1437       return reader.midkey();
1438     }
1439 
1440     public long length() {
1441       return reader.length();
1442     }
1443 
1444     public long getTotalUncompressedBytes() {
1445       return reader.getTrailer().getTotalUncompressedBytes();
1446     }
1447 
1448     public long getEntries() {
1449       return reader.getEntries();
1450     }
1451 
1452     public long getDeleteFamilyCnt() {
1453       return deleteFamilyCnt;
1454     }
1455 
1456     public byte[] getFirstKey() {
1457       return reader.getFirstKey();
1458     }
1459 
1460     public long indexSize() {
1461       return reader.indexSize();
1462     }
1463 
1464     public BloomType getBloomFilterType() {
1465       return this.bloomFilterType;
1466     }
1467 
1468     public long getSequenceID() {
1469       return sequenceID;
1470     }
1471 
1472     public void setSequenceID(long sequenceID) {
1473       this.sequenceID = sequenceID;
1474     }
1475 
1476     BloomFilter getGeneralBloomFilter() {
1477       return generalBloomFilter;
1478     }
1479 
1480     long getUncompressedDataIndexSize() {
1481       return reader.getTrailer().getUncompressedDataIndexSize();
1482     }
1483 
1484     public long getTotalBloomSize() {
1485       if (generalBloomFilter == null)
1486         return 0;
1487       return generalBloomFilter.getByteSize();
1488     }
1489 
1490     public int getHFileVersion() {
1491       return reader.getTrailer().getMajorVersion();
1492     }
1493 
1494     public int getHFileMinorVersion() {
1495       return reader.getTrailer().getMinorVersion();
1496     }
1497 
1498     public HFile.Reader getHFileReader() {
1499       return reader;
1500     }
1501 
1502     void disableBloomFilterForTesting() {
1503       generalBloomFilter = null;
1504       this.deleteFamilyBloomFilter = null;
1505     }
1506 
1507     public long getMaxTimestamp() {
1508       return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp();
1509     }
1510   }
1511 
1512   /**
1513    * Useful comparators for comparing StoreFiles.
1514    */
1515   public abstract static class Comparators {
1516     /**
1517      * Comparator that compares based on the Sequence Ids of the
1518      * the StoreFiles. Bulk loads that did not request a seq ID
1519      * are given a seq id of -1; thus, they are placed before all non-
1520      * bulk loads, and bulk loads with sequence Id. Among these files,
1521      * the size is used to determine the ordering, then bulkLoadTime.
1522      * If there are ties, the path name is used as a tie-breaker.
1523      */
1524     public static final Comparator<StoreFile> SEQ_ID =
1525       Ordering.compound(ImmutableList.of(
1526           Ordering.natural().onResultOf(new GetSeqId()),
1527           Ordering.natural().onResultOf(new GetFileSize()).reverse(),
1528           Ordering.natural().onResultOf(new GetBulkTime()),
1529           Ordering.natural().onResultOf(new GetPathName())
1530       ));
1531 
1532     private static class GetSeqId implements Function<StoreFile, Long> {
1533       @Override
1534       public Long apply(StoreFile sf) {
1535         return sf.getMaxSequenceId();
1536       }
1537     }
1538 
1539     private static class GetFileSize implements Function<StoreFile, Long> {
1540       @Override
1541       public Long apply(StoreFile sf) {
1542         return sf.getReader().length();
1543       }
1544     }
1545 
1546     private static class GetBulkTime implements Function<StoreFile, Long> {
1547       @Override
1548       public Long apply(StoreFile sf) {
1549         if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
1550         return sf.getBulkLoadTimestamp();
1551       }
1552     }
1553 
1554     private static class GetPathName implements Function<StoreFile, String> {
1555       @Override
1556       public String apply(StoreFile sf) {
1557         return sf.getPath().getName();
1558       }
1559     }
1560   }
1561 }