View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.DataInput;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Comparator;
28  import java.util.Map;
29  import java.util.SortedSet;
30  import java.util.UUID;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.CellComparator;
44  import org.apache.hadoop.hbase.KeyValueUtil;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.client.Scan;
47  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
48  import org.apache.hadoop.hbase.io.hfile.BlockType;
49  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
50  import org.apache.hadoop.hbase.io.hfile.HFile;
51  import org.apache.hadoop.hbase.io.hfile.HFileContext;
52  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
53  import org.apache.hadoop.hbase.nio.ByteBuff;
54  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
55  import org.apache.hadoop.hbase.util.BloomFilter;
56  import org.apache.hadoop.hbase.util.BloomFilterFactory;
57  import org.apache.hadoop.hbase.util.BloomFilterWriter;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.Writables;
60  import org.apache.hadoop.io.WritableUtils;
61  import org.apache.hadoop.hbase.io.hfile.HFileBlock;
62  
63  import com.google.common.base.Function;
64  import com.google.common.base.Preconditions;
65  import com.google.common.collect.ImmutableList;
66  import com.google.common.collect.Ordering;
67  
68  /**
69   * A Store data file.  Stores usually have one or more of these files.  They
70   * are produced by flushing the memstore to disk.  To
71   * create, instantiate a writer using {@link StoreFile.WriterBuilder}
72   * and append data. Be sure to add any metadata before calling close on the
73   * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
74   * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
75   * passing filesystem and path.  To read, call {@link #createReader()}.
76   * <p>StoreFiles may also reference store files in another Store.
77   *
78   * The reason for this weird pattern where you use a different instance for the
79   * writer and a reader is that we write once but read a lot more.
80   */
81  @InterfaceAudience.LimitedPrivate("Coprocessor")
82  public class StoreFile {
83    private static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
84  
85    // Keys for fileinfo values in HFile
86  
87    /** Max Sequence ID in FileInfo */
88    public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
89  
90    /** Major compaction flag in FileInfo */
91    public static final byte[] MAJOR_COMPACTION_KEY =
92        Bytes.toBytes("MAJOR_COMPACTION_KEY");
93  
94    /** Minor compaction flag in FileInfo */
95    public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
96        Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
97  
98    /** Bloom filter Type in FileInfo */
99    public static final byte[] BLOOM_FILTER_TYPE_KEY =
100       Bytes.toBytes("BLOOM_FILTER_TYPE");
101 
102   /** Delete Family Count in FileInfo */
103   public static final byte[] DELETE_FAMILY_COUNT =
104       Bytes.toBytes("DELETE_FAMILY_COUNT");
105 
106   /** Last Bloom filter key in FileInfo */
107   private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
108 
109   /** Key for Timerange information in metadata*/
110   public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
111 
112   /** Key for timestamp of earliest-put in metadata*/
113   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
114 
115   /** Key for the number of mob cells in metadata*/
116   public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
117 
118   private final StoreFileInfo fileInfo;
119   private final FileSystem fs;
120 
121   // Block cache configuration and reference.
122   private final CacheConfig cacheConf;
123 
124   // Keys for metadata stored in backing HFile.
125   // Set when we obtain a Reader.
126   private long sequenceid = -1;
127 
128   // max of the MemstoreTS in the KV's in this store
129   // Set when we obtain a Reader.
130   private long maxMemstoreTS = -1;
131 
132   public long getMaxMemstoreTS() {
133     return maxMemstoreTS;
134   }
135 
136   public void setMaxMemstoreTS(long maxMemstoreTS) {
137     this.maxMemstoreTS = maxMemstoreTS;
138   }
139 
140   // If true, this file was product of a major compaction.  Its then set
141   // whenever you get a Reader.
142   private AtomicBoolean majorCompaction = null;
143 
144   // If true, this file should not be included in minor compactions.
145   // It's set whenever you get a Reader.
146   private boolean excludeFromMinorCompaction = false;
147 
148   /** Meta key set when store file is a result of a bulk load */
149   public static final byte[] BULKLOAD_TASK_KEY =
150     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
151   public static final byte[] BULKLOAD_TIME_KEY =
152     Bytes.toBytes("BULKLOAD_TIMESTAMP");
153 
154   /**
155    * Map of the metadata entries in the corresponding HFile
156    */
157   private Map<byte[], byte[]> metadataMap;
158 
159   // StoreFile.Reader
160   private volatile Reader reader;
161 
162   /**
163    * Bloom filter type specified in column family configuration. Does not
164    * necessarily correspond to the Bloom filter type present in the HFile.
165    */
166   private final BloomType cfBloomType;
167 
168   /**
169    * Key for skipping resetting sequence id in metadata.
170    * For bulk loaded hfiles, the scanner resets the cell seqId with the latest one,
171    * if this metadata is set as true, the reset is skipped.
172    */
173   public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
174 
175   /**
176    * Constructor, loads a reader and it's indices, etc. May allocate a
177    * substantial amount of ram depending on the underlying files (10-20MB?).
178    *
179    * @param fs  The current file system to use.
180    * @param p  The path of the file.
181    * @param conf  The current configuration.
182    * @param cacheConf  The cache configuration and block cache reference.
183    * @param cfBloomType The bloom type to use for this store file as specified
184    *          by column family configuration. This may or may not be the same
185    *          as the Bloom filter type actually present in the HFile, because
186    *          column family configuration might change. If this is
187    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
188    * @throws IOException When opening the reader fails.
189    */
190   public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
191         final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
192     this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType);
193   }
194 
195 
196   /**
197    * Constructor, loads a reader and it's indices, etc. May allocate a
198    * substantial amount of ram depending on the underlying files (10-20MB?).
199    *
200    * @param fs  The current file system to use.
201    * @param fileInfo  The store file information.
202    * @param conf  The current configuration.
203    * @param cacheConf  The cache configuration and block cache reference.
204    * @param cfBloomType The bloom type to use for this store file as specified
205    *          by column family configuration. This may or may not be the same
206    *          as the Bloom filter type actually present in the HFile, because
207    *          column family configuration might change. If this is
208    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
209    * @throws IOException When opening the reader fails.
210    */
211   public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
212       final CacheConfig cacheConf,  final BloomType cfBloomType) throws IOException {
213     this.fs = fs;
214     this.fileInfo = fileInfo;
215     this.cacheConf = cacheConf;
216 
217     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
218       this.cfBloomType = cfBloomType;
219     } else {
220       LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
221           "cfBloomType=" + cfBloomType + " (disabled in config)");
222       this.cfBloomType = BloomType.NONE;
223     }
224   }
225 
226   /**
227    * Clone
228    * @param other The StoreFile to clone from
229    */
230   public StoreFile(final StoreFile other) {
231     this.fs = other.fs;
232     this.fileInfo = other.fileInfo;
233     this.cacheConf = other.cacheConf;
234     this.cfBloomType = other.cfBloomType;
235   }
236 
237   /**
238    * @return the StoreFile object associated to this StoreFile.
239    *         null if the StoreFile is not a reference.
240    */
241   public StoreFileInfo getFileInfo() {
242     return this.fileInfo;
243   }
244 
245   /**
246    * @return Path or null if this StoreFile was made with a Stream.
247    */
248   public Path getPath() {
249     return this.fileInfo.getPath();
250   }
251 
252   /**
253    * @return Returns the qualified path of this StoreFile
254    */
255   public Path getQualifiedPath() {
256     return this.fileInfo.getPath().makeQualified(fs);
257   }
258 
259   /**
260    * @return True if this is a StoreFile Reference; call after {@link #open()}
261    * else may get wrong answer.
262    */
263   public boolean isReference() {
264     return this.fileInfo.isReference();
265   }
266 
267   /**
268    * @return True if this file was made by a major compaction.
269    */
270   public boolean isMajorCompaction() {
271     if (this.majorCompaction == null) {
272       throw new NullPointerException("This has not been set yet");
273     }
274     return this.majorCompaction.get();
275   }
276 
277   /**
278    * @return True if this file should not be part of a minor compaction.
279    */
280   public boolean excludeFromMinorCompaction() {
281     return this.excludeFromMinorCompaction;
282   }
283 
284   /**
285    * @return This files maximum edit sequence id.
286    */
287   public long getMaxSequenceId() {
288     return this.sequenceid;
289   }
290 
291   public long getModificationTimeStamp() throws IOException {
292     return (fileInfo == null) ? 0 : fileInfo.getModificationTime();
293   }
294 
295   /**
296    * Only used by the Striped Compaction Policy
297    * @param key
298    * @return value associated with the metadata key
299    */
300   public byte[] getMetadataValue(byte[] key) {
301     return metadataMap.get(key);
302   }
303 
304   /**
305    * Return the largest memstoreTS found across all storefiles in
306    * the given list. Store files that were created by a mapreduce
307    * bulk load are ignored, as they do not correspond to any specific
308    * put operation, and thus do not have a memstoreTS associated with them.
309    * @return 0 if no non-bulk-load files are provided or, this is Store that
310    * does not yet have any store files.
311    */
312   public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) {
313     long max = 0;
314     for (StoreFile sf : sfs) {
315       if (!sf.isBulkLoadResult()) {
316         max = Math.max(max, sf.getMaxMemstoreTS());
317       }
318     }
319     return max;
320   }
321 
322   /**
323    * Return the highest sequence ID found across all storefiles in
324    * the given list.
325    * @param sfs
326    * @return 0 if no non-bulk-load files are provided or, this is Store that
327    * does not yet have any store files.
328    */
329   public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
330     long max = 0;
331     for (StoreFile sf : sfs) {
332       max = Math.max(max, sf.getMaxSequenceId());
333     }
334     return max;
335   }
336 
337   /**
338    * Check if this storefile was created by bulk load.
339    * When a hfile is bulk loaded into HBase, we append
340    * '_SeqId_<id-when-loaded>' to the hfile name, unless
341    * "hbase.mapreduce.bulkload.assign.sequenceNumbers" is
342    * explicitly turned off.
343    * If "hbase.mapreduce.bulkload.assign.sequenceNumbers"
344    * is turned off, fall back to BULKLOAD_TIME_KEY.
345    * @return true if this storefile was created by bulk load.
346    */
347   boolean isBulkLoadResult() {
348     boolean bulkLoadedHFile = false;
349     String fileName = this.getPath().getName();
350     int startPos = fileName.indexOf("SeqId_");
351     if (startPos != -1) {
352       bulkLoadedHFile = true;
353     }
354     return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
355   }
356 
357   /**
358    * Return the timestamp at which this bulk load file was generated.
359    */
360   public long getBulkLoadTimestamp() {
361     byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
362     return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp);
363   }
364 
365   /**
366    * @return the cached value of HDFS blocks distribution. The cached value is
367    * calculated when store file is opened.
368    */
369   public HDFSBlocksDistribution getHDFSBlockDistribution() {
370     return this.fileInfo.getHDFSBlockDistribution();
371   }
372 
373   /**
374    * Opens reader on this store file.  Called by Constructor.
375    * @return Reader for the store file.
376    * @throws IOException
377    * @see #closeReader(boolean)
378    */
379   private Reader open() throws IOException {
380     if (this.reader != null) {
381       throw new IllegalAccessError("Already open");
382     }
383 
384     // Open the StoreFile.Reader
385     this.reader = fileInfo.open(this.fs, this.cacheConf);
386 
387     // Load up indices and fileinfo. This also loads Bloom filter type.
388     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
389 
390     // Read in our metadata.
391     byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
392     if (b != null) {
393       // By convention, if halfhfile, top half has a sequence number > bottom
394       // half. Thats why we add one in below. Its done for case the two halves
395       // are ever merged back together --rare.  Without it, on open of store,
396       // since store files are distinguished by sequence id, the one half would
397       // subsume the other.
398       this.sequenceid = Bytes.toLong(b);
399       if (fileInfo.isTopReference()) {
400         this.sequenceid += 1;
401       }
402     }
403 
404     if (isBulkLoadResult()){
405       // generate the sequenceId from the fileName
406       // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
407       String fileName = this.getPath().getName();
408       // Use lastIndexOf() to get the last, most recent bulk load seqId.
409       int startPos = fileName.lastIndexOf("SeqId_");
410       if (startPos != -1) {
411         this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
412             fileName.indexOf('_', startPos + 6)));
413         // Handle reference files as done above.
414         if (fileInfo.isTopReference()) {
415           this.sequenceid += 1;
416         }
417       }
418       // SKIP_RESET_SEQ_ID only works in bulk loaded file.
419       // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
420       // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
421       // to reset new seqIds for them since this might make a mess of the visibility of cells that
422       // have the same row key but different seqIds.
423       this.reader.setSkipResetSeqId(isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)));
424       this.reader.setBulkLoaded(true);
425     }
426     this.reader.setSequenceID(this.sequenceid);
427 
428     b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
429     if (b != null) {
430       this.maxMemstoreTS = Bytes.toLong(b);
431     }
432 
433     b = metadataMap.get(MAJOR_COMPACTION_KEY);
434     if (b != null) {
435       boolean mc = Bytes.toBoolean(b);
436       if (this.majorCompaction == null) {
437         this.majorCompaction = new AtomicBoolean(mc);
438       } else {
439         this.majorCompaction.set(mc);
440       }
441     } else {
442       // Presume it is not major compacted if it doesn't explicity say so
443       // HFileOutputFormat explicitly sets the major compacted key.
444       this.majorCompaction = new AtomicBoolean(false);
445     }
446 
447     b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
448     this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
449 
450     BloomType hfileBloomType = reader.getBloomFilterType();
451     if (cfBloomType != BloomType.NONE) {
452       reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
453       if (hfileBloomType != cfBloomType) {
454         LOG.info("HFile Bloom filter type for "
455             + reader.getHFileReader().getName() + ": " + hfileBloomType
456             + ", but " + cfBloomType + " specified in column family "
457             + "configuration");
458       }
459     } else if (hfileBloomType != BloomType.NONE) {
460       LOG.info("Bloom filter turned off by CF config for "
461           + reader.getHFileReader().getName());
462     }
463 
464     // load delete family bloom filter
465     reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
466 
467     try {
468       byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
469       if (timerangeBytes != null) {
470         this.reader.timeRangeTracker = new TimeRangeTracker();
471         Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
472       }
473     } catch (IllegalArgumentException e) {
474       LOG.error("Error reading timestamp range data from meta -- " +
475           "proceeding without", e);
476       this.reader.timeRangeTracker = null;
477     }
478     return this.reader;
479   }
480 
481   /**
482    * @return Reader for StoreFile. creates if necessary
483    * @throws IOException
484    */
485   public Reader createReader() throws IOException {
486     if (this.reader == null) {
487       try {
488         this.reader = open();
489       } catch (IOException e) {
490         try {
491           this.closeReader(true);
492         } catch (IOException ee) {
493         }
494         throw e;
495       }
496 
497     }
498     return this.reader;
499   }
500 
501   /**
502    * @return Current reader.  Must call createReader first else returns null.
503    * @see #createReader()
504    */
505   public Reader getReader() {
506     return this.reader;
507   }
508 
509   /**
510    * @param evictOnClose whether to evict blocks belonging to this file
511    * @throws IOException
512    */
513   public synchronized void closeReader(boolean evictOnClose)
514       throws IOException {
515     if (this.reader != null) {
516       this.reader.close(evictOnClose);
517       this.reader = null;
518     }
519   }
520 
521   /**
522    * Delete this file
523    * @throws IOException
524    */
525   public void deleteReader() throws IOException {
526     closeReader(true);
527     this.fs.delete(getPath(), true);
528   }
529 
530   @Override
531   public String toString() {
532     return this.fileInfo.toString();
533   }
534 
535   /**
536    * @return a length description of this StoreFile, suitable for debug output
537    */
538   public String toStringDetailed() {
539     StringBuilder sb = new StringBuilder();
540     sb.append(this.getPath().toString());
541     sb.append(", isReference=").append(isReference());
542     sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
543     if (isBulkLoadResult()) {
544       sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
545     } else {
546       sb.append(", seqid=").append(getMaxSequenceId());
547     }
548     sb.append(", majorCompaction=").append(isMajorCompaction());
549 
550     return sb.toString();
551   }
552 
553   /**
554    * Gets whether to skip resetting the sequence id for cells.
555    * @param skipResetSeqId The byte array of boolean.
556    * @return Whether to skip resetting the sequence id.
557    */
558   private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
559     if (skipResetSeqId != null && skipResetSeqId.length == 1) {
560       return Bytes.toBoolean(skipResetSeqId);
561     }
562     return false;
563   }
564 
565   public static class WriterBuilder {
566     private final Configuration conf;
567     private final CacheConfig cacheConf;
568     private final FileSystem fs;
569 
570     private CellComparator comparator = CellComparator.COMPARATOR;
571     private BloomType bloomType = BloomType.NONE;
572     private long maxKeyCount = 0;
573     private Path dir;
574     private Path filePath;
575     private InetSocketAddress[] favoredNodes;
576     private HFileContext fileContext;
577     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
578         FileSystem fs) {
579       this.conf = conf;
580       this.cacheConf = cacheConf;
581       this.fs = fs;
582     }
583 
584     /**
585      * Use either this method or {@link #withFilePath}, but not both.
586      * @param dir Path to column family directory. The directory is created if
587      *          does not exist. The file is given a unique name within this
588      *          directory.
589      * @return this (for chained invocation)
590      */
591     public WriterBuilder withOutputDir(Path dir) {
592       Preconditions.checkNotNull(dir);
593       this.dir = dir;
594       return this;
595     }
596 
597     /**
598      * Use either this method or {@link #withOutputDir}, but not both.
599      * @param filePath the StoreFile path to write
600      * @return this (for chained invocation)
601      */
602     public WriterBuilder withFilePath(Path filePath) {
603       Preconditions.checkNotNull(filePath);
604       this.filePath = filePath;
605       return this;
606     }
607 
608     /**
609      * @param favoredNodes an array of favored nodes or possibly null
610      * @return this (for chained invocation)
611      */
612     public WriterBuilder withFavoredNodes(InetSocketAddress[] favoredNodes) {
613       this.favoredNodes = favoredNodes;
614       return this;
615     }
616 
617     public WriterBuilder withComparator(CellComparator comparator) {
618       Preconditions.checkNotNull(comparator);
619       this.comparator = comparator;
620       return this;
621     }
622 
623     public WriterBuilder withBloomType(BloomType bloomType) {
624       Preconditions.checkNotNull(bloomType);
625       this.bloomType = bloomType;
626       return this;
627     }
628 
629     /**
630      * @param maxKeyCount estimated maximum number of keys we expect to add
631      * @return this (for chained invocation)
632      */
633     public WriterBuilder withMaxKeyCount(long maxKeyCount) {
634       this.maxKeyCount = maxKeyCount;
635       return this;
636     }
637 
638     public WriterBuilder withFileContext(HFileContext fileContext) {
639       this.fileContext = fileContext;
640       return this;
641     }
642     /**
643      * Create a store file writer. Client is responsible for closing file when
644      * done. If metadata, add BEFORE closing using
645      * {@link Writer#appendMetadata}.
646      */
647     public Writer build() throws IOException {
648       if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
649         throw new IllegalArgumentException("Either specify parent directory " +
650             "or file path");
651       }
652 
653       if (dir == null) {
654         dir = filePath.getParent();
655       }
656 
657       if (!fs.exists(dir)) {
658         fs.mkdirs(dir);
659       }
660 
661       if (filePath == null) {
662         filePath = getUniqueFile(fs, dir);
663         if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
664           bloomType = BloomType.NONE;
665         }
666       }
667 
668       if (comparator == null) {
669         comparator = CellComparator.COMPARATOR;
670       }
671       return new Writer(fs, filePath,
672           conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext);
673     }
674   }
675 
676   /**
677    * @param fs
678    * @param dir Directory to create file in.
679    * @return random filename inside passed <code>dir</code>
680    */
681   public static Path getUniqueFile(final FileSystem fs, final Path dir)
682       throws IOException {
683     if (!fs.getFileStatus(dir).isDirectory()) {
684       throw new IOException("Expecting " + dir.toString() +
685         " to be a directory");
686     }
687     return new Path(dir, UUID.randomUUID().toString().replaceAll("-", ""));
688   }
689 
690   public Long getMinimumTimestamp() {
691     return (getReader().timeRangeTracker == null) ?
692         null :
693         getReader().timeRangeTracker.getMinimumTimestamp();
694   }
695 
696   /**
697    * Gets the approximate mid-point of this file that is optimal for use in splitting it.
698    * @param comparator Comparator used to compare KVs.
699    * @return The split point row, or null if splitting is not possible, or reader is null.
700    */
701   @SuppressWarnings("deprecation")
702   byte[] getFileSplitPoint(CellComparator comparator) throws IOException {
703     if (this.reader == null) {
704       LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
705       return null;
706     }
707     // Get first, last, and mid keys.  Midkey is the key that starts block
708     // in middle of hfile.  Has column and timestamp.  Need to return just
709     // the row we want to split on as midkey.
710     Cell midkey = this.reader.midkey();
711     if (midkey != null) {
712       Cell firstKey = this.reader.getFirstKey();
713       Cell lastKey = this.reader.getLastKey();
714       // if the midkey is the same as the first or last keys, we cannot (ever) split this region.
715       if (comparator.compareRows(midkey, firstKey) == 0
716           || comparator.compareRows(midkey, lastKey) == 0) {
717         if (LOG.isDebugEnabled()) {
718           LOG.debug("cannot split because midkey is the same as first or last row");
719         }
720         return null;
721       }
722       return CellUtil.cloneRow(midkey);
723     }
724     return null;
725   }
726 
727   /**
728    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
729    * local because it is an implementation detail of the HBase regionserver.
730    */
731   public static class Writer implements Compactor.CellSink {
732     private final BloomFilterWriter generalBloomFilterWriter;
733     private final BloomFilterWriter deleteFamilyBloomFilterWriter;
734     private final BloomType bloomType;
735     private byte[] lastBloomKey;
736     private int lastBloomKeyOffset, lastBloomKeyLen;
737     private Cell lastCell = null;
738     private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
739     private Cell lastDeleteFamilyCell = null;
740     private long deleteFamilyCnt = 0;
741 
742     /** Bytes per Checksum */
743     protected int bytesPerChecksum;
744 
745     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
746     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
747      * When flushing a memstore, we set TimeRange and use this variable to
748      * indicate that it doesn't need to be calculated again while
749      * appending KeyValues.
750      * It is not set in cases of compactions when it is recalculated using only
751      * the appended KeyValues*/
752     boolean isTimeRangeTrackerSet = false;
753 
754     protected HFile.Writer writer;
755     private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
756 
757     /**
758      * Creates an HFile.Writer that also write helpful meta data.
759      * @param fs file system to write to
760      * @param path file name to create
761      * @param conf user configuration
762      * @param comparator key comparator
763      * @param bloomType bloom filter setting
764      * @param maxKeys the expected maximum number of keys to be added. Was used
765      *        for Bloom filter size in {@link HFile} format version 1.
766      * @param favoredNodes
767      * @param fileContext - The HFile context
768      * @throws IOException problem writing to FS
769      */
770     private Writer(FileSystem fs, Path path,
771         final Configuration conf,
772         CacheConfig cacheConf,
773         final CellComparator comparator, BloomType bloomType, long maxKeys,
774         InetSocketAddress[] favoredNodes, HFileContext fileContext)
775             throws IOException {
776       writer = HFile.getWriterFactory(conf, cacheConf)
777           .withPath(fs, path)
778           .withComparator(comparator)
779           .withFavoredNodes(favoredNodes)
780           .withFileContext(fileContext)
781           .create();
782 
783       generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
784           conf, cacheConf, bloomType,
785           (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
786 
787       if (generalBloomFilterWriter != null) {
788         this.bloomType = bloomType;
789         if(this.bloomType ==  BloomType.ROWCOL) {
790           lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue();
791         }
792         if (LOG.isTraceEnabled()) LOG.trace("Bloom filter type for " + path + ": " +
793           this.bloomType + ", " + generalBloomFilterWriter.getClass().getSimpleName());
794       } else {
795         // Not using Bloom filters.
796         this.bloomType = BloomType.NONE;
797       }
798 
799       // initialize delete family Bloom filter when there is NO RowCol Bloom
800       // filter
801       if (this.bloomType != BloomType.ROWCOL) {
802         this.deleteFamilyBloomFilterWriter = BloomFilterFactory
803             .createDeleteBloomAtWrite(conf, cacheConf,
804                 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
805       } else {
806         deleteFamilyBloomFilterWriter = null;
807       }
808       if (deleteFamilyBloomFilterWriter != null) {
809         if (LOG.isTraceEnabled()) LOG.trace("Delete Family Bloom filter type for " + path + ": "
810             + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
811       }
812     }
813 
814     /**
815      * Writes meta data.
816      * Call before {@link #close()} since its written as meta data to this file.
817      * @param maxSequenceId Maximum sequence id.
818      * @param majorCompaction True if this file is product of a major compaction
819      * @throws IOException problem writing to FS
820      */
821     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
822     throws IOException {
823       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
824       writer.appendFileInfo(MAJOR_COMPACTION_KEY,
825           Bytes.toBytes(majorCompaction));
826       appendTrackedTimestampsToMetadata();
827     }
828 
829     /**
830      * Writes meta data.
831      * Call before {@link #close()} since its written as meta data to this file.
832      * @param maxSequenceId Maximum sequence id.
833      * @param majorCompaction True if this file is product of a major compaction
834      * @param mobCellsCount The number of mob cells.
835      * @throws IOException problem writing to FS
836      */
837     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
838         final long mobCellsCount) throws IOException {
839       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
840       writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
841       writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
842       appendTrackedTimestampsToMetadata();
843     }
844 
845     /**
846      * Add TimestampRange and earliest put timestamp to Metadata
847      */
848     public void appendTrackedTimestampsToMetadata() throws IOException {
849       appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
850       appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
851     }
852 
853     /**
854      * Set TimeRangeTracker
855      * @param trt
856      */
857     public void setTimeRangeTracker(final TimeRangeTracker trt) {
858       this.timeRangeTracker = trt;
859       isTimeRangeTrackerSet = true;
860     }
861 
862     /**
863      * Record the earlest Put timestamp.
864      *
865      * If the timeRangeTracker is not set,
866      * update TimeRangeTracker to include the timestamp of this key
867      * @param cell
868      */
869     public void trackTimestamps(final Cell cell) {
870       if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
871         earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
872       }
873       if (!isTimeRangeTrackerSet) {
874         timeRangeTracker.includeTimestamp(cell);
875       }
876     }
877 
878     private void appendGeneralBloomfilter(final Cell cell) throws IOException {
879       if (this.generalBloomFilterWriter != null) {
880         // only add to the bloom filter on a new, unique key
881         boolean newKey = true;
882         if (this.lastCell != null) {
883           switch(bloomType) {
884           case ROW:
885             newKey = ! CellUtil.matchingRows(cell, lastCell);
886             break;
887           case ROWCOL:
888             newKey = ! CellUtil.matchingRowColumn(cell, lastCell);
889             break;
890           case NONE:
891             newKey = false;
892             break;
893           default:
894             throw new IOException("Invalid Bloom filter type: " + bloomType +
895                 " (ROW or ROWCOL expected)");
896           }
897         }
898         if (newKey) {
899           /*
900            * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
901            * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
902            *
903            * 2 Types of Filtering:
904            *  1. Row = Row
905            *  2. RowCol = Row + Qualifier
906            */
907           byte[] bloomKey = null;
908           // Used with ROW_COL bloom
909           KeyValue bloomKeyKV = null;
910           int bloomKeyOffset, bloomKeyLen;
911 
912           switch (bloomType) {
913           case ROW:
914             bloomKey = cell.getRowArray();
915             bloomKeyOffset = cell.getRowOffset();
916             bloomKeyLen = cell.getRowLength();
917             break;
918           case ROWCOL:
919             // merge(row, qualifier)
920             // TODO: could save one buffer copy in case of compound Bloom
921             // filters when this involves creating a KeyValue
922             bloomKeyKV = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(),
923                 cell.getRowLength(), 
924                 HConstants.EMPTY_BYTE_ARRAY, 0, 0, cell.getQualifierArray(),
925                 cell.getQualifierOffset(),
926                 cell.getQualifierLength());
927             bloomKey = bloomKeyKV.getBuffer();
928             bloomKeyOffset = bloomKeyKV.getKeyOffset();
929             bloomKeyLen = bloomKeyKV.getKeyLength();
930             break;
931           default:
932             throw new IOException("Invalid Bloom filter type: " + bloomType +
933                 " (ROW or ROWCOL expected)");
934           }
935           generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
936           if (lastBloomKey != null) {
937             int res = 0;
938             // hbase:meta does not have blooms. So we need not have special interpretation
939             // of the hbase:meta cells.  We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
940             if (bloomType == BloomType.ROW) {
941               res = Bytes.BYTES_RAWCOMPARATOR.compare(bloomKey, bloomKeyOffset, bloomKeyLen,
942                   lastBloomKey, lastBloomKeyOffset, lastBloomKeyLen);
943             } else {
944               // TODO : Caching of kv components becomes important in these cases
945               res = CellComparator.COMPARATOR.compare(bloomKeyKV, lastBloomKeyOnlyKV);
946             }
947             if (res <= 0) {
948               throw new IOException("Non-increasing Bloom keys: "
949                   + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen) + " after "
950                   + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset, lastBloomKeyLen));
951             }
952           }
953           lastBloomKey = bloomKey;
954           lastBloomKeyOffset = bloomKeyOffset;
955           lastBloomKeyLen = bloomKeyLen;
956           if (bloomType == BloomType.ROWCOL) {
957             lastBloomKeyOnlyKV.setKey(bloomKey, bloomKeyOffset, bloomKeyLen);
958           }
959           this.lastCell = cell;
960         }
961       }
962     }
963 
964     private void appendDeleteFamilyBloomFilter(final Cell cell)
965         throws IOException {
966       if (!CellUtil.isDeleteFamily(cell) && !CellUtil.isDeleteFamilyVersion(cell)) {
967         return;
968       }
969 
970       // increase the number of delete family in the store file
971       deleteFamilyCnt++;
972       if (null != this.deleteFamilyBloomFilterWriter) {
973         boolean newKey = true;
974         if (lastDeleteFamilyCell != null) {
975           // hbase:meta does not have blooms. So we need not have special interpretation
976           // of the hbase:meta cells
977           newKey = !CellUtil.matchingRows(cell, lastDeleteFamilyCell);
978         }
979         if (newKey) {
980           this.deleteFamilyBloomFilterWriter.add(cell.getRowArray(),
981               cell.getRowOffset(), cell.getRowLength());
982           this.lastDeleteFamilyCell = cell;
983         }
984       }
985     }
986 
987     public void append(final Cell cell) throws IOException {
988       appendGeneralBloomfilter(cell);
989       appendDeleteFamilyBloomFilter(cell);
990       writer.append(cell);
991       trackTimestamps(cell);
992     }
993 
994     public Path getPath() {
995       return this.writer.getPath();
996     }
997 
998     public boolean hasGeneralBloom() {
999       return this.generalBloomFilterWriter != null;
1000     }
1001 
1002     /**
1003      * For unit testing only.
1004      *
1005      * @return the Bloom filter used by this writer.
1006      */
1007     BloomFilterWriter getGeneralBloomWriter() {
1008       return generalBloomFilterWriter;
1009     }
1010 
1011     private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
1012       boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
1013       if (haveBloom) {
1014         bfw.compactBloom();
1015       }
1016       return haveBloom;
1017     }
1018 
1019     private boolean closeGeneralBloomFilter() throws IOException {
1020       boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
1021 
1022       // add the general Bloom filter writer and append file info
1023       if (hasGeneralBloom) {
1024         writer.addGeneralBloomFilter(generalBloomFilterWriter);
1025         writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY,
1026             Bytes.toBytes(bloomType.toString()));
1027         if (lastBloomKey != null) {
1028           writer.appendFileInfo(LAST_BLOOM_KEY, Arrays.copyOfRange(
1029               lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
1030                   + lastBloomKeyLen));
1031         }
1032       }
1033       return hasGeneralBloom;
1034     }
1035 
1036     private boolean closeDeleteFamilyBloomFilter() throws IOException {
1037       boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
1038 
1039       // add the delete family Bloom filter writer
1040       if (hasDeleteFamilyBloom) {
1041         writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
1042       }
1043 
1044       // append file info about the number of delete family kvs
1045       // even if there is no delete family Bloom.
1046       writer.appendFileInfo(DELETE_FAMILY_COUNT,
1047           Bytes.toBytes(this.deleteFamilyCnt));
1048 
1049       return hasDeleteFamilyBloom;
1050     }
1051 
1052     public void close() throws IOException {
1053       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
1054       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
1055 
1056       writer.close();
1057 
1058       // Log final Bloom filter statistics. This needs to be done after close()
1059       // because compound Bloom filters might be finalized as part of closing.
1060       if (StoreFile.LOG.isTraceEnabled()) {
1061         StoreFile.LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
1062           (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
1063           getPath());
1064       }
1065 
1066     }
1067 
1068     public void appendFileInfo(byte[] key, byte[] value) throws IOException {
1069       writer.appendFileInfo(key, value);
1070     }
1071 
1072     /** For use in testing, e.g. {@link org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile}
1073      */
1074     HFile.Writer getHFileWriter() {
1075       return writer;
1076     }
1077   }
1078 
1079   /**
1080    * Reader for a StoreFile.
1081    */
1082   public static class Reader {
1083     private static final Log LOG = LogFactory.getLog(Reader.class.getName());
1084 
1085     protected BloomFilter generalBloomFilter = null;
1086     protected BloomFilter deleteFamilyBloomFilter = null;
1087     protected BloomType bloomFilterType;
1088     private final HFile.Reader reader;
1089     protected TimeRangeTracker timeRangeTracker = null;
1090     protected long sequenceID = -1;
1091     private byte[] lastBloomKey;
1092     private long deleteFamilyCnt = -1;
1093     private boolean bulkLoadResult = false;
1094     private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
1095     private boolean skipResetSeqId = true;
1096 
1097     public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
1098         throws IOException {
1099       reader = HFile.createReader(fs, path, cacheConf, conf);
1100       bloomFilterType = BloomType.NONE;
1101     }
1102 
1103     public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
1104         CacheConfig cacheConf, Configuration conf) throws IOException {
1105       reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
1106       bloomFilterType = BloomType.NONE;
1107     }
1108 
1109     /**
1110      * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
1111      */
1112     Reader() {
1113       this.reader = null;
1114     }
1115 
1116     public CellComparator getComparator() {
1117       return reader.getComparator();
1118     }
1119 
1120     /**
1121      * Get a scanner to scan over this StoreFile. Do not use
1122      * this overload if using this scanner for compactions.
1123      *
1124      * @param cacheBlocks should this scanner cache blocks?
1125      * @param pread use pread (for highly concurrent small readers)
1126      * @return a scanner
1127      */
1128     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1129                                                boolean pread) {
1130       return getStoreFileScanner(cacheBlocks, pread, false,
1131         // 0 is passed as readpoint because this method is only used by test
1132         // where StoreFile is directly operated upon
1133         0);
1134     }
1135 
1136     /**
1137      * Get a scanner to scan over this StoreFile.
1138      *
1139      * @param cacheBlocks should this scanner cache blocks?
1140      * @param pread use pread (for highly concurrent small readers)
1141      * @param isCompaction is scanner being used for compaction?
1142      * @return a scanner
1143      */
1144     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1145                                                boolean pread,
1146                                                boolean isCompaction, long readPt) {
1147       return new StoreFileScanner(this,
1148                                  getScanner(cacheBlocks, pread, isCompaction),
1149                                  !isCompaction, reader.hasMVCCInfo(), readPt);
1150     }
1151 
1152     /**
1153      * Warning: Do not write further code which depends on this call. Instead
1154      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1155      * which is the preferred way to scan a store with higher level concepts.
1156      *
1157      * @param cacheBlocks should we cache the blocks?
1158      * @param pread use pread (for concurrent small readers)
1159      * @return the underlying HFileScanner
1160      */
1161     @Deprecated
1162     public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
1163       return getScanner(cacheBlocks, pread, false);
1164     }
1165 
1166     /**
1167      * Warning: Do not write further code which depends on this call. Instead
1168      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1169      * which is the preferred way to scan a store with higher level concepts.
1170      *
1171      * @param cacheBlocks
1172      *          should we cache the blocks?
1173      * @param pread
1174      *          use pread (for concurrent small readers)
1175      * @param isCompaction
1176      *          is scanner being used for compaction?
1177      * @return the underlying HFileScanner
1178      */
1179     @Deprecated
1180     public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
1181         boolean isCompaction) {
1182       return reader.getScanner(cacheBlocks, pread, isCompaction);
1183     }
1184 
1185     public void close(boolean evictOnClose) throws IOException {
1186       reader.close(evictOnClose);
1187     }
1188 
1189     /**
1190      * Check if this storeFile may contain keys within the TimeRange that
1191      * have not expired (i.e. not older than oldestUnexpiredTS).
1192      * @param scan the current scan
1193      * @param oldestUnexpiredTS the oldest timestamp that is not expired, as
1194      *          determined by the column family's TTL
1195      * @return false if queried keys definitely don't exist in this StoreFile
1196      */
1197     boolean passesTimerangeFilter(Scan scan, long oldestUnexpiredTS) {
1198       if (timeRangeTracker == null) {
1199         return true;
1200       } else {
1201         return timeRangeTracker.includesTimeRange(scan.getTimeRange()) &&
1202             timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS;
1203       }
1204     }
1205 
1206     /**
1207      * Checks whether the given scan passes the Bloom filter (if present). Only
1208      * checks Bloom filters for single-row or single-row-column scans. Bloom
1209      * filter checking for multi-gets is implemented as part of the store
1210      * scanner system (see {@link StoreFileScanner#seekExactly}) and uses
1211      * the lower-level API {@link #passesGeneralBloomFilter(byte[], int, int, byte[],
1212      * int, int)}.
1213      *
1214      * @param scan the scan specification. Used to determine the row, and to
1215      *          check whether this is a single-row ("get") scan.
1216      * @param columns the set of columns. Only used for row-column Bloom
1217      *          filters.
1218      * @return true if the scan with the given column set passes the Bloom
1219      *         filter, or if the Bloom filter is not applicable for the scan.
1220      *         False if the Bloom filter is applicable and the scan fails it.
1221      */
1222      boolean passesBloomFilter(Scan scan,
1223         final SortedSet<byte[]> columns) {
1224       // Multi-column non-get scans will use Bloom filters through the
1225       // lower-level API function that this function calls.
1226       if (!scan.isGetScan()) {
1227         return true;
1228       }
1229 
1230       byte[] row = scan.getStartRow();
1231       switch (this.bloomFilterType) {
1232         case ROW:
1233           return passesGeneralBloomFilter(row, 0, row.length, null, 0, 0);
1234 
1235         case ROWCOL:
1236           if (columns != null && columns.size() == 1) {
1237             byte[] column = columns.first();
1238             return passesGeneralBloomFilter(row, 0, row.length, column, 0,
1239                 column.length);
1240           }
1241 
1242           // For multi-column queries the Bloom filter is checked from the
1243           // seekExact operation.
1244           return true;
1245 
1246         default:
1247           return true;
1248       }
1249     }
1250 
1251     public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
1252         int rowLen) {
1253       // Cache Bloom filter as a local variable in case it is set to null by
1254       // another thread on an IO error.
1255       BloomFilter bloomFilter = this.deleteFamilyBloomFilter;
1256 
1257       // Empty file or there is no delete family at all
1258       if (reader.getTrailer().getEntryCount() == 0 || deleteFamilyCnt == 0) {
1259         return false;
1260       }
1261 
1262       if (bloomFilter == null) {
1263         return true;
1264       }
1265 
1266       try {
1267         if (!bloomFilter.supportsAutoLoading()) {
1268           return true;
1269         }
1270         return bloomFilter.contains(row, rowOffset, rowLen, null);
1271       } catch (IllegalArgumentException e) {
1272         LOG.error("Bad Delete Family bloom filter data -- proceeding without",
1273             e);
1274         setDeleteFamilyBloomFilterFaulty();
1275       }
1276 
1277       return true;
1278     }
1279 
1280     /**
1281      * A method for checking Bloom filters. Called directly from
1282      * StoreFileScanner in case of a multi-column query.
1283      *
1284      * @param row
1285      * @param rowOffset
1286      * @param rowLen
1287      * @param col
1288      * @param colOffset
1289      * @param colLen
1290      * @return True if passes
1291      */
1292     public boolean passesGeneralBloomFilter(byte[] row, int rowOffset,
1293         int rowLen, byte[] col, int colOffset, int colLen) {
1294       // Cache Bloom filter as a local variable in case it is set to null by
1295       // another thread on an IO error.
1296       BloomFilter bloomFilter = this.generalBloomFilter;
1297       if (bloomFilter == null) {
1298         return true;
1299       }
1300 
1301       // Used in ROW bloom
1302       byte[] key = null;
1303       // Used in ROW_COL bloom
1304       KeyValue kvKey = null;
1305       switch (bloomFilterType) {
1306         case ROW:
1307           if (col != null) {
1308             throw new RuntimeException("Row-only Bloom filter called with " +
1309                 "column specified");
1310           }
1311           if (rowOffset != 0 || rowLen != row.length) {
1312               throw new AssertionError("For row-only Bloom filters the row "
1313                   + "must occupy the whole array");
1314           }
1315           key = row;
1316           break;
1317 
1318         case ROWCOL:
1319           kvKey = KeyValueUtil.createFirstOnRow(row, rowOffset, rowLen, 
1320               HConstants.EMPTY_BYTE_ARRAY, 0, 0, col, colOffset,
1321               colLen);
1322           break;
1323 
1324         default:
1325           return true;
1326       }
1327 
1328       // Empty file
1329       if (reader.getTrailer().getEntryCount() == 0)
1330         return false;
1331       HFileBlock bloomBlock = null;
1332       try {
1333         boolean shouldCheckBloom;
1334         ByteBuff bloom;
1335         if (bloomFilter.supportsAutoLoading()) {
1336           bloom = null;
1337           shouldCheckBloom = true;
1338         } else {
1339           bloomBlock = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY, true);
1340           bloom = bloomBlock.getBufferWithoutHeader();
1341           shouldCheckBloom = bloom != null;
1342         }
1343 
1344         if (shouldCheckBloom) {
1345           boolean exists;
1346 
1347           // Whether the primary Bloom key is greater than the last Bloom key
1348           // from the file info. For row-column Bloom filters this is not yet
1349           // a sufficient condition to return false.
1350           boolean keyIsAfterLast = (lastBloomKey != null);
1351           // hbase:meta does not have blooms. So we need not have special interpretation
1352           // of the hbase:meta cells.  We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
1353           if (keyIsAfterLast) {
1354             if (bloomFilterType == BloomType.ROW) {
1355               keyIsAfterLast = (Bytes.BYTES_RAWCOMPARATOR.compare(key, lastBloomKey) > 0);
1356             } else {
1357               keyIsAfterLast = (CellComparator.COMPARATOR.compare(kvKey, lastBloomKeyOnlyKV)) > 0;
1358             }
1359           }
1360 
1361           if (bloomFilterType == BloomType.ROWCOL) {
1362             // Since a Row Delete is essentially a DeleteFamily applied to all
1363             // columns, a file might be skipped if using row+col Bloom filter.
1364             // In order to ensure this file is included an additional check is
1365             // required looking only for a row bloom.
1366             KeyValue rowBloomKey = KeyValueUtil.createFirstOnRow(row, rowOffset, rowLen,
1367                 HConstants.EMPTY_BYTE_ARRAY, 0, 0, HConstants.EMPTY_BYTE_ARRAY, 0, 0);
1368             // hbase:meta does not have blooms. So we need not have special interpretation
1369             // of the hbase:meta cells.  We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
1370             if (keyIsAfterLast
1371                 && (CellComparator.COMPARATOR.compare(rowBloomKey, lastBloomKeyOnlyKV)) > 0) {
1372               exists = false;
1373             } else {
1374               exists =
1375                   bloomFilter.contains(kvKey, bloom) ||
1376                   bloomFilter.contains(rowBloomKey, bloom);
1377             }
1378           } else {
1379             exists = !keyIsAfterLast
1380                 && bloomFilter.contains(key, 0, key.length, bloom);
1381           }
1382 
1383           return exists;
1384         }
1385       } catch (IOException e) {
1386         LOG.error("Error reading bloom filter data -- proceeding without",
1387             e);
1388         setGeneralBloomFilterFaulty();
1389       } catch (IllegalArgumentException e) {
1390         LOG.error("Bad bloom filter data -- proceeding without", e);
1391         setGeneralBloomFilterFaulty();
1392       } finally {
1393         // Return the bloom block so that its ref count can be decremented.
1394         reader.returnBlock(bloomBlock);
1395       }
1396       return true;
1397     }
1398 
1399     /**
1400      * Checks whether the given scan rowkey range overlaps with the current storefile's
1401      * @param scan the scan specification. Used to determine the rowkey range.
1402      * @return true if there is overlap, false otherwise
1403      */
1404     public boolean passesKeyRangeFilter(Scan scan) {
1405       if (this.getFirstKey() == null || this.getLastKey() == null) {
1406         // the file is empty
1407         return false;
1408       }
1409       if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
1410           && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
1411         return true;
1412       }
1413       byte[] smallestScanRow = scan.isReversed() ? scan.getStopRow() : scan.getStartRow();
1414       byte[] largestScanRow = scan.isReversed() ? scan.getStartRow() : scan.getStopRow();
1415       Cell firstKeyKV = this.getFirstKey();
1416       Cell lastKeyKV = this.getLastKey();
1417       boolean nonOverLapping = (getComparator().compareRows(firstKeyKV,
1418           largestScanRow, 0, largestScanRow.length) > 0 
1419           && !Bytes
1420           .equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(),
1421               HConstants.EMPTY_END_ROW))
1422           || getComparator().compareRows(lastKeyKV, smallestScanRow, 0, smallestScanRow.length) < 0;
1423       return !nonOverLapping;
1424     }
1425 
1426     public Map<byte[], byte[]> loadFileInfo() throws IOException {
1427       Map<byte [], byte []> fi = reader.loadFileInfo();
1428 
1429       byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
1430       if (b != null) {
1431         bloomFilterType = BloomType.valueOf(Bytes.toString(b));
1432       }
1433 
1434       lastBloomKey = fi.get(LAST_BLOOM_KEY);
1435       if(bloomFilterType == BloomType.ROWCOL) {
1436         lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue(lastBloomKey, 0, lastBloomKey.length);
1437       }
1438       byte[] cnt = fi.get(DELETE_FAMILY_COUNT);
1439       if (cnt != null) {
1440         deleteFamilyCnt = Bytes.toLong(cnt);
1441       }
1442 
1443       return fi;
1444     }
1445 
1446     public void loadBloomfilter() {
1447       this.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
1448       this.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
1449     }
1450 
1451     private void loadBloomfilter(BlockType blockType) {
1452       try {
1453         if (blockType == BlockType.GENERAL_BLOOM_META) {
1454           if (this.generalBloomFilter != null)
1455             return; // Bloom has been loaded
1456 
1457           DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
1458           if (bloomMeta != null) {
1459             // sanity check for NONE Bloom filter
1460             if (bloomFilterType == BloomType.NONE) {
1461               throw new IOException(
1462                   "valid bloom filter type not found in FileInfo");
1463             } else {
1464               generalBloomFilter = BloomFilterFactory.createFromMeta(bloomMeta,
1465                   reader);
1466               if (LOG.isTraceEnabled()) {
1467                 LOG.trace("Loaded " + bloomFilterType.toString() + " "
1468                   + generalBloomFilter.getClass().getSimpleName()
1469                   + " metadata for " + reader.getName());
1470               }
1471             }
1472           }
1473         } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1474           if (this.deleteFamilyBloomFilter != null)
1475             return; // Bloom has been loaded
1476 
1477           DataInput bloomMeta = reader.getDeleteBloomFilterMetadata();
1478           if (bloomMeta != null) {
1479             deleteFamilyBloomFilter = BloomFilterFactory.createFromMeta(
1480                 bloomMeta, reader);
1481             LOG.info("Loaded Delete Family Bloom ("
1482                 + deleteFamilyBloomFilter.getClass().getSimpleName()
1483                 + ") metadata for " + reader.getName());
1484           }
1485         } else {
1486           throw new RuntimeException("Block Type: " + blockType.toString()
1487               + "is not supported for Bloom filter");
1488         }
1489       } catch (IOException e) {
1490         LOG.error("Error reading bloom filter meta for " + blockType
1491             + " -- proceeding without", e);
1492         setBloomFilterFaulty(blockType);
1493       } catch (IllegalArgumentException e) {
1494         LOG.error("Bad bloom filter meta " + blockType
1495             + " -- proceeding without", e);
1496         setBloomFilterFaulty(blockType);
1497       }
1498     }
1499 
1500     private void setBloomFilterFaulty(BlockType blockType) {
1501       if (blockType == BlockType.GENERAL_BLOOM_META) {
1502         setGeneralBloomFilterFaulty();
1503       } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1504         setDeleteFamilyBloomFilterFaulty();
1505       }
1506     }
1507 
1508     /**
1509      * The number of Bloom filter entries in this store file, or an estimate
1510      * thereof, if the Bloom filter is not loaded. This always returns an upper
1511      * bound of the number of Bloom filter entries.
1512      *
1513      * @return an estimate of the number of Bloom filter entries in this file
1514      */
1515     public long getFilterEntries() {
1516       return generalBloomFilter != null ? generalBloomFilter.getKeyCount()
1517           : reader.getEntries();
1518     }
1519 
1520     public void setGeneralBloomFilterFaulty() {
1521       generalBloomFilter = null;
1522     }
1523 
1524     public void setDeleteFamilyBloomFilterFaulty() {
1525       this.deleteFamilyBloomFilter = null;
1526     }
1527 
1528     public Cell getLastKey() {
1529       return reader.getLastKey();
1530     }
1531 
1532     public byte[] getLastRowKey() {
1533       return reader.getLastRowKey();
1534     }
1535 
1536     public Cell midkey() throws IOException {
1537       return reader.midkey();
1538     }
1539 
1540     public long length() {
1541       return reader.length();
1542     }
1543 
1544     public long getTotalUncompressedBytes() {
1545       return reader.getTrailer().getTotalUncompressedBytes();
1546     }
1547 
1548     public long getEntries() {
1549       return reader.getEntries();
1550     }
1551 
1552     public long getDeleteFamilyCnt() {
1553       return deleteFamilyCnt;
1554     }
1555 
1556     public Cell getFirstKey() {
1557       return reader.getFirstKey();
1558     }
1559 
1560     public long indexSize() {
1561       return reader.indexSize();
1562     }
1563 
1564     public BloomType getBloomFilterType() {
1565       return this.bloomFilterType;
1566     }
1567 
1568     public long getSequenceID() {
1569       return sequenceID;
1570     }
1571 
1572     public void setSequenceID(long sequenceID) {
1573       this.sequenceID = sequenceID;
1574     }
1575 
1576     public void setBulkLoaded(boolean bulkLoadResult) {
1577       this.bulkLoadResult = bulkLoadResult;
1578     }
1579 
1580     public boolean isBulkLoaded() {
1581       return this.bulkLoadResult;
1582     }
1583 
1584     BloomFilter getGeneralBloomFilter() {
1585       return generalBloomFilter;
1586     }
1587 
1588     long getUncompressedDataIndexSize() {
1589       return reader.getTrailer().getUncompressedDataIndexSize();
1590     }
1591 
1592     public long getTotalBloomSize() {
1593       if (generalBloomFilter == null)
1594         return 0;
1595       return generalBloomFilter.getByteSize();
1596     }
1597 
1598     public int getHFileVersion() {
1599       return reader.getTrailer().getMajorVersion();
1600     }
1601 
1602     public int getHFileMinorVersion() {
1603       return reader.getTrailer().getMinorVersion();
1604     }
1605 
1606     public HFile.Reader getHFileReader() {
1607       return reader;
1608     }
1609 
1610     void disableBloomFilterForTesting() {
1611       generalBloomFilter = null;
1612       this.deleteFamilyBloomFilter = null;
1613     }
1614 
1615     public long getMaxTimestamp() {
1616       return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp();
1617     }
1618 
1619     boolean isSkipResetSeqId() {
1620       return skipResetSeqId;
1621     }
1622 
1623     void setSkipResetSeqId(boolean skipResetSeqId) {
1624       this.skipResetSeqId = skipResetSeqId;
1625     }
1626   }
1627 
1628   /**
1629    * Useful comparators for comparing StoreFiles.
1630    */
1631   public abstract static class Comparators {
1632     /**
1633      * Comparator that compares based on the Sequence Ids of the
1634      * the StoreFiles. Bulk loads that did not request a seq ID
1635      * are given a seq id of -1; thus, they are placed before all non-
1636      * bulk loads, and bulk loads with sequence Id. Among these files,
1637      * the size is used to determine the ordering, then bulkLoadTime.
1638      * If there are ties, the path name is used as a tie-breaker.
1639      */
1640     public static final Comparator<StoreFile> SEQ_ID =
1641       Ordering.compound(ImmutableList.of(
1642           Ordering.natural().onResultOf(new GetSeqId()),
1643           Ordering.natural().onResultOf(new GetFileSize()).reverse(),
1644           Ordering.natural().onResultOf(new GetBulkTime()),
1645           Ordering.natural().onResultOf(new GetPathName())
1646       ));
1647 
1648     private static class GetSeqId implements Function<StoreFile, Long> {
1649       @Override
1650       public Long apply(StoreFile sf) {
1651         return sf.getMaxSequenceId();
1652       }
1653     }
1654 
1655     private static class GetFileSize implements Function<StoreFile, Long> {
1656       @Override
1657       public Long apply(StoreFile sf) {
1658         return sf.getReader().length();
1659       }
1660     }
1661 
1662     private static class GetBulkTime implements Function<StoreFile, Long> {
1663       @Override
1664       public Long apply(StoreFile sf) {
1665         if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
1666         return sf.getBulkLoadTimestamp();
1667       }
1668     }
1669 
1670     private static class GetPathName implements Function<StoreFile, String> {
1671       @Override
1672       public String apply(StoreFile sf) {
1673         return sf.getPath().getName();
1674       }
1675     }
1676   }
1677 }