View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.DataInput;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.nio.ByteBuffer;
25  import java.util.Arrays;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.Comparator;
29  import java.util.Map;
30  import java.util.SortedSet;
31  import java.util.UUID;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
43  import org.apache.hadoop.hbase.KeyValue;
44  import org.apache.hadoop.hbase.KeyValue.KVComparator;
45  import org.apache.hadoop.hbase.KeyValueUtil;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.client.Scan;
48  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
49  import org.apache.hadoop.hbase.io.TimeRange;
50  import org.apache.hadoop.hbase.io.hfile.BlockType;
51  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
52  import org.apache.hadoop.hbase.io.hfile.HFile;
53  import org.apache.hadoop.hbase.io.hfile.HFileContext;
54  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
55  import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
56  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
57  import org.apache.hadoop.hbase.util.BloomFilter;
58  import org.apache.hadoop.hbase.util.BloomFilterFactory;
59  import org.apache.hadoop.hbase.util.BloomFilterWriter;
60  import org.apache.hadoop.hbase.util.Bytes;
61  import org.apache.hadoop.hbase.util.Writables;
62  import org.apache.hadoop.io.WritableUtils;
63  
64  import com.google.common.base.Function;
65  import com.google.common.base.Preconditions;
66  import com.google.common.collect.ImmutableList;
67  import com.google.common.collect.Ordering;
68  
69  /**
70   * A Store data file.  Stores usually have one or more of these files.  They
71   * are produced by flushing the memstore to disk.  To
72   * create, instantiate a writer using {@link StoreFile.WriterBuilder}
73   * and append data. Be sure to add any metadata before calling close on the
74   * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
75   * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
76   * passing filesystem and path.  To read, call {@link #createReader()}.
77   * <p>StoreFiles may also reference store files in another Store.
78   *
79   * The reason for this weird pattern where you use a different instance for the
80   * writer and a reader is that we write once but read a lot more.
81   */
82  @InterfaceAudience.LimitedPrivate("Coprocessor")
83  public class StoreFile {
84    private static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
85  
86    // Keys for fileinfo values in HFile
87  
88    /** Max Sequence ID in FileInfo */
89    public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
90  
91    /** Major compaction flag in FileInfo */
92    public static final byte[] MAJOR_COMPACTION_KEY =
93        Bytes.toBytes("MAJOR_COMPACTION_KEY");
94  
95    /** Minor compaction flag in FileInfo */
96    public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
97        Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
98  
99    /** Bloom filter Type in FileInfo */
100   public static final byte[] BLOOM_FILTER_TYPE_KEY =
101       Bytes.toBytes("BLOOM_FILTER_TYPE");
102 
103   /** Delete Family Count in FileInfo */
104   public static final byte[] DELETE_FAMILY_COUNT =
105       Bytes.toBytes("DELETE_FAMILY_COUNT");
106 
107   /** Last Bloom filter key in FileInfo */
108   private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
109 
110   /** Key for Timerange information in metadata*/
111   public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
112 
113   /** Key for timestamp of earliest-put in metadata*/
114   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
115 
116   private final StoreFileInfo fileInfo;
117   private final FileSystem fs;
118 
119   // Block cache configuration and reference.
120   private final CacheConfig cacheConf;
121 
122   // Keys for metadata stored in backing HFile.
123   // Set when we obtain a Reader.
124   private long sequenceid = -1;
125 
126   // max of the MemstoreTS in the KV's in this store
127   // Set when we obtain a Reader.
128   private long maxMemstoreTS = -1;
129 
130   // firstKey, lastkey and cellComparator will be set when openReader.
131   private byte[] firstKey;
132 
133   private byte[] lastKey;
134 
135   private KVComparator comparator;
136 
137   CacheConfig getCacheConf() {
138     return cacheConf;
139   }
140 
141   public byte[] getFirstKey() {
142     return firstKey;
143   }
144 
145   public byte[] getLastKey() {
146     return lastKey;
147   }
148 
149   public KVComparator getComparator() {
150     return comparator;
151   }
152 
153   public long getMaxMemstoreTS() {
154     return maxMemstoreTS;
155   }
156 
157   public void setMaxMemstoreTS(long maxMemstoreTS) {
158     this.maxMemstoreTS = maxMemstoreTS;
159   }
160 
161   // If true, this file was product of a major compaction.  Its then set
162   // whenever you get a Reader.
163   private AtomicBoolean majorCompaction = null;
164 
165   // If true, this file should not be included in minor compactions.
166   // It's set whenever you get a Reader.
167   private boolean excludeFromMinorCompaction = false;
168 
169   /** Meta key set when store file is a result of a bulk load */
170   public static final byte[] BULKLOAD_TASK_KEY =
171     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
172   public static final byte[] BULKLOAD_TIME_KEY =
173     Bytes.toBytes("BULKLOAD_TIMESTAMP");
174 
175   /**
176    * Map of the metadata entries in the corresponding HFile
177    */
178   private Map<byte[], byte[]> metadataMap;
179 
180   // StoreFile.Reader
181   private volatile Reader reader;
182 
183   /**
184    * Bloom filter type specified in column family configuration. Does not
185    * necessarily correspond to the Bloom filter type present in the HFile.
186    */
187   private final BloomType cfBloomType;
188 
189   /**
190    * Constructor, loads a reader and it's indices, etc. May allocate a
191    * substantial amount of ram depending on the underlying files (10-20MB?).
192    *
193    * @param fs  The current file system to use.
194    * @param p  The path of the file.
195    * @param conf  The current configuration.
196    * @param cacheConf  The cache configuration and block cache reference.
197    * @param cfBloomType The bloom type to use for this store file as specified
198    *          by column family configuration. This may or may not be the same
199    *          as the Bloom filter type actually present in the HFile, because
200    *          column family configuration might change. If this is
201    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
202    * @throws IOException When opening the reader fails.
203    */
204   public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
205         final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
206     this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType);
207   }
208 
209 
210   /**
211    * Constructor, loads a reader and it's indices, etc. May allocate a
212    * substantial amount of ram depending on the underlying files (10-20MB?).
213    *
214    * @param fs  The current file system to use.
215    * @param fileInfo  The store file information.
216    * @param conf  The current configuration.
217    * @param cacheConf  The cache configuration and block cache reference.
218    * @param cfBloomType The bloom type to use for this store file as specified
219    *          by column family configuration. This may or may not be the same
220    *          as the Bloom filter type actually present in the HFile, because
221    *          column family configuration might change. If this is
222    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
223    * @throws IOException When opening the reader fails.
224    */
225   public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
226       final CacheConfig cacheConf,  final BloomType cfBloomType) throws IOException {
227     this.fs = fs;
228     this.fileInfo = fileInfo;
229     this.cacheConf = cacheConf;
230 
231     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
232       this.cfBloomType = cfBloomType;
233     } else {
234       LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
235           "cfBloomType=" + cfBloomType + " (disabled in config)");
236       this.cfBloomType = BloomType.NONE;
237     }
238   }
239 
240   /**
241    * Clone
242    * @param other The StoreFile to clone from
243    */
244   public StoreFile(final StoreFile other) {
245     this.fs = other.fs;
246     this.fileInfo = other.fileInfo;
247     this.cacheConf = other.cacheConf;
248     this.cfBloomType = other.cfBloomType;
249   }
250 
251   /**
252    * @return the StoreFile object associated to this StoreFile.
253    *         null if the StoreFile is not a reference.
254    */
255   public StoreFileInfo getFileInfo() {
256     return this.fileInfo;
257   }
258 
259   /**
260    * @return Path or null if this StoreFile was made with a Stream.
261    */
262   public Path getPath() {
263     return this.fileInfo.getPath();
264   }
265 
266   /**
267    * @return Returns the qualified path of this StoreFile
268    */
269   public Path getQualifiedPath() {
270     return this.fileInfo.getPath().makeQualified(fs);
271   }
272 
273   /**
274    * @return True if this is a StoreFile Reference; call
275    * after {@link #open(boolean canUseDropBehind)} else may get wrong answer.
276    */
277   public boolean isReference() {
278     return this.fileInfo.isReference();
279   }
280 
281   /**
282    * @return True if this file was made by a major compaction.
283    */
284   public boolean isMajorCompaction() {
285     if (this.majorCompaction == null) {
286       throw new NullPointerException("This has not been set yet");
287     }
288     return this.majorCompaction.get();
289   }
290 
291   /**
292    * @return True if this file should not be part of a minor compaction.
293    */
294   public boolean excludeFromMinorCompaction() {
295     return this.excludeFromMinorCompaction;
296   }
297 
298   /**
299    * @return This files maximum edit sequence id.
300    */
301   public long getMaxSequenceId() {
302     return this.sequenceid;
303   }
304 
305   public long getModificationTimeStamp() throws IOException {
306     return (fileInfo == null) ? 0 : fileInfo.getModificationTime();
307   }
308 
309   /**
310    * Only used by the Striped Compaction Policy
311    * @param key
312    * @return value associated with the metadata key
313    */
314   public byte[] getMetadataValue(byte[] key) {
315     return metadataMap.get(key);
316   }
317 
318   /**
319    * Return the largest memstoreTS found across all storefiles in
320    * the given list. Store files that were created by a mapreduce
321    * bulk load are ignored, as they do not correspond to any specific
322    * put operation, and thus do not have a memstoreTS associated with them.
323    * @return 0 if no non-bulk-load files are provided or, this is Store that
324    * does not yet have any store files.
325    */
326   public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) {
327     long max = 0;
328     for (StoreFile sf : sfs) {
329       if (!sf.isBulkLoadResult()) {
330         max = Math.max(max, sf.getMaxMemstoreTS());
331       }
332     }
333     return max;
334   }
335 
336   /**
337    * Return the highest sequence ID found across all storefiles in
338    * the given list.
339    * @param sfs
340    * @return 0 if no non-bulk-load files are provided or, this is Store that
341    * does not yet have any store files.
342    */
343   public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
344     long max = 0;
345     for (StoreFile sf : sfs) {
346       max = Math.max(max, sf.getMaxSequenceId());
347     }
348     return max;
349   }
350 
351   /**
352    * Check if this storefile was created by bulk load.
353    * When a hfile is bulk loaded into HBase, we append
354    * '_SeqId_<id-when-loaded>' to the hfile name, unless
355    * "hbase.mapreduce.bulkload.assign.sequenceNumbers" is
356    * explicitly turned off.
357    * If "hbase.mapreduce.bulkload.assign.sequenceNumbers"
358    * is turned off, fall back to BULKLOAD_TIME_KEY.
359    * @return true if this storefile was created by bulk load.
360    */
361   boolean isBulkLoadResult() {
362     boolean bulkLoadedHFile = false;
363     String fileName = this.getPath().getName();
364     int startPos = fileName.indexOf("SeqId_");
365     if (startPos != -1) {
366       bulkLoadedHFile = true;
367     }
368     return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
369   }
370 
371   /**
372    * Return the timestamp at which this bulk load file was generated.
373    */
374   public long getBulkLoadTimestamp() {
375     byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
376     return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp);
377   }
378 
379   /**
380    * @return the cached value of HDFS blocks distribution. The cached value is
381    * calculated when store file is opened.
382    */
383   public HDFSBlocksDistribution getHDFSBlockDistribution() {
384     return this.fileInfo.getHDFSBlockDistribution();
385   }
386 
387   /**
388    * Opens reader on this store file.  Called by Constructor.
389    * @return Reader for the store file.
390    * @throws IOException
391    * @see #closeReader(boolean)
392    */
393   private Reader open(boolean canUseDropBehind) throws IOException {
394     if (this.reader != null) {
395       throw new IllegalAccessError("Already open");
396     }
397 
398     // Open the StoreFile.Reader
399     this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
400 
401     // Load up indices and fileinfo. This also loads Bloom filter type.
402     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
403 
404     // Read in our metadata.
405     byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
406     if (b != null) {
407       // By convention, if halfhfile, top half has a sequence number > bottom
408       // half. Thats why we add one in below. Its done for case the two halves
409       // are ever merged back together --rare.  Without it, on open of store,
410       // since store files are distinguished by sequence id, the one half would
411       // subsume the other.
412       this.sequenceid = Bytes.toLong(b);
413       if (fileInfo.isTopReference()) {
414         this.sequenceid += 1;
415       }
416     }
417 
418     if (isBulkLoadResult()){
419       // generate the sequenceId from the fileName
420       // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
421       String fileName = this.getPath().getName();
422       // Use lastIndexOf() to get the last, most recent bulk load seqId.
423       int startPos = fileName.lastIndexOf("SeqId_");
424       if (startPos != -1) {
425         this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
426             fileName.indexOf('_', startPos + 6)));
427         // Handle reference files as done above.
428         if (fileInfo.isTopReference()) {
429           this.sequenceid += 1;
430         }
431       }
432       this.reader.setBulkLoaded(true);
433     }
434     this.reader.setSequenceID(this.sequenceid);
435 
436     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
437     if (b != null) {
438       this.maxMemstoreTS = Bytes.toLong(b);
439     }
440 
441     b = metadataMap.get(MAJOR_COMPACTION_KEY);
442     if (b != null) {
443       boolean mc = Bytes.toBoolean(b);
444       if (this.majorCompaction == null) {
445         this.majorCompaction = new AtomicBoolean(mc);
446       } else {
447         this.majorCompaction.set(mc);
448       }
449     } else {
450       // Presume it is not major compacted if it doesn't explicity say so
451       // HFileOutputFormat explicitly sets the major compacted key.
452       this.majorCompaction = new AtomicBoolean(false);
453     }
454 
455     b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
456     this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
457 
458     BloomType hfileBloomType = reader.getBloomFilterType();
459     if (cfBloomType != BloomType.NONE) {
460       reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
461       if (hfileBloomType != cfBloomType) {
462         LOG.info("HFile Bloom filter type for "
463             + reader.getHFileReader().getName() + ": " + hfileBloomType
464             + ", but " + cfBloomType + " specified in column family "
465             + "configuration");
466       }
467     } else if (hfileBloomType != BloomType.NONE) {
468       LOG.info("Bloom filter turned off by CF config for "
469           + reader.getHFileReader().getName());
470     }
471 
472     // load delete family bloom filter
473     reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
474 
475     try {
476       byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
477       if (timerangeBytes != null) {
478         this.reader.timeRangeTracker = new TimeRangeTracker();
479         Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
480       }
481     } catch (IllegalArgumentException e) {
482       LOG.error("Error reading timestamp range data from meta -- " +
483           "proceeding without", e);
484       this.reader.timeRangeTracker = null;
485     }
486     // initialize so we can reuse them after reader closed.
487     firstKey = reader.getFirstKey();
488     lastKey = reader.getLastKey();
489     comparator = reader.getComparator();
490     return this.reader;
491   }
492 
493   public Reader createReader() throws IOException {
494     return createReader(false);
495   }
496 
497   /**
498    * @return Reader for StoreFile. creates if necessary
499    * @throws IOException
500    */
501   public Reader createReader(boolean canUseDropBehind) throws IOException {
502     if (this.reader == null) {
503       try {
504         this.reader = open(canUseDropBehind);
505       } catch (IOException e) {
506         try {
507           boolean evictOnClose =
508               cacheConf != null? cacheConf.shouldEvictOnClose(): true; 
509           this.closeReader(evictOnClose);
510         } catch (IOException ee) {
511         }
512         throw e;
513       }
514 
515     }
516     return this.reader;
517   }
518 
519   /**
520    * @return Current reader.  Must call createReader first else returns null.
521    * @see #createReader()
522    */
523   public Reader getReader() {
524     return this.reader;
525   }
526 
527   /**
528    * @param evictOnClose whether to evict blocks belonging to this file
529    * @throws IOException
530    */
531   public synchronized void closeReader(boolean evictOnClose)
532       throws IOException {
533     if (this.reader != null) {
534       this.reader.close(evictOnClose);
535       this.reader = null;
536     }
537   }
538 
539   /**
540    * Delete this file
541    * @throws IOException
542    */
543   public void deleteReader() throws IOException {
544     boolean evictOnClose =
545         cacheConf != null? cacheConf.shouldEvictOnClose(): true; 
546     closeReader(evictOnClose);
547     this.fs.delete(getPath(), true);
548   }
549 
550   @Override
551   public String toString() {
552     return this.fileInfo.toString();
553   }
554 
555   /**
556    * @return a length description of this StoreFile, suitable for debug output
557    */
558   public String toStringDetailed() {
559     StringBuilder sb = new StringBuilder();
560     sb.append(this.getPath().toString());
561     sb.append(", isReference=").append(isReference());
562     sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
563     if (isBulkLoadResult()) {
564       sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
565     } else {
566       sb.append(", seqid=").append(getMaxSequenceId());
567     }
568     sb.append(", majorCompaction=").append(isMajorCompaction());
569 
570     return sb.toString();
571   }
572 
573   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
574       justification="Will not overflow")
575   public static class WriterBuilder {
576     private final Configuration conf;
577     private final CacheConfig cacheConf;
578     private final FileSystem fs;
579 
580     private KeyValue.KVComparator comparator = KeyValue.COMPARATOR;
581     private BloomType bloomType = BloomType.NONE;
582     private long maxKeyCount = 0;
583     private Path dir;
584     private Path filePath;
585     private InetSocketAddress[] favoredNodes;
586     private HFileContext fileContext;
587     private boolean shouldDropCacheBehind;
588     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
589         FileSystem fs) {
590       this.conf = conf;
591       this.cacheConf = cacheConf;
592       this.fs = fs;
593     }
594 
595     /**
596      * Use either this method or {@link #withFilePath}, but not both.
597      * @param dir Path to column family directory. The directory is created if
598      *          does not exist. The file is given a unique name within this
599      *          directory.
600      * @return this (for chained invocation)
601      */
602     public WriterBuilder withOutputDir(Path dir) {
603       Preconditions.checkNotNull(dir);
604       this.dir = dir;
605       return this;
606     }
607 
608     /**
609      * Use either this method or {@link #withOutputDir}, but not both.
610      * @param filePath the StoreFile path to write
611      * @return this (for chained invocation)
612      */
613     public WriterBuilder withFilePath(Path filePath) {
614       Preconditions.checkNotNull(filePath);
615       this.filePath = filePath;
616       return this;
617     }
618 
619     /**
620      * @param favoredNodes an array of favored nodes or possibly null
621      * @return this (for chained invocation)
622      */
623     public WriterBuilder withFavoredNodes(InetSocketAddress[] favoredNodes) {
624       this.favoredNodes = favoredNodes;
625       return this;
626     }
627 
628     public WriterBuilder withComparator(KeyValue.KVComparator comparator) {
629       Preconditions.checkNotNull(comparator);
630       this.comparator = comparator;
631       return this;
632     }
633 
634     public WriterBuilder withBloomType(BloomType bloomType) {
635       Preconditions.checkNotNull(bloomType);
636       this.bloomType = bloomType;
637       return this;
638     }
639 
640     /**
641      * @param maxKeyCount estimated maximum number of keys we expect to add
642      * @return this (for chained invocation)
643      */
644     public WriterBuilder withMaxKeyCount(long maxKeyCount) {
645       this.maxKeyCount = maxKeyCount;
646       return this;
647     }
648 
649     public WriterBuilder withFileContext(HFileContext fileContext) {
650       this.fileContext = fileContext;
651       return this;
652     }
653 
654     public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
655       this.shouldDropCacheBehind = shouldDropCacheBehind;
656       return this;
657     }
658     /**
659      * Create a store file writer. Client is responsible for closing file when
660      * done. If metadata, add BEFORE closing using
661      * {@link Writer#appendMetadata}.
662      */
663     public Writer build() throws IOException {
664       if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
665         throw new IllegalArgumentException("Either specify parent directory " +
666             "or file path");
667       }
668 
669       if (dir == null) {
670         dir = filePath.getParent();
671       }
672 
673       if (!fs.exists(dir)) {
674         fs.mkdirs(dir);
675       }
676 
677       if (filePath == null) {
678         filePath = getUniqueFile(fs, dir);
679         if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
680           bloomType = BloomType.NONE;
681         }
682       }
683 
684       if (comparator == null) {
685         comparator = KeyValue.COMPARATOR;
686       }
687       return new Writer(fs, filePath,
688           conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext, shouldDropCacheBehind);
689     }
690   }
691 
692   /**
693    * @param fs
694    * @param dir Directory to create file in.
695    * @return random filename inside passed <code>dir</code>
696    */
697   public static Path getUniqueFile(final FileSystem fs, final Path dir)
698       throws IOException {
699     if (!fs.getFileStatus(dir).isDirectory()) {
700       throw new IOException("Expecting " + dir.toString() +
701         " to be a directory");
702     }
703     return new Path(dir, UUID.randomUUID().toString().replaceAll("-", ""));
704   }
705 
706   public Long getMinimumTimestamp() {
707     return (getReader().timeRangeTracker == null) ?
708         null :
709         getReader().timeRangeTracker.getMinimumTimestamp();
710   }
711 
712   /**
713    * Gets the approximate mid-point of this file that is optimal for use in splitting it.
714    * @param comparator Comparator used to compare KVs.
715    * @return The split point row, or null if splitting is not possible, or reader is null.
716    */
717   @SuppressWarnings("deprecation")
718   byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
719     if (this.reader == null) {
720       LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
721       return null;
722     }
723     // Get first, last, and mid keys.  Midkey is the key that starts block
724     // in middle of hfile.  Has column and timestamp.  Need to return just
725     // the row we want to split on as midkey.
726     byte [] midkey = this.reader.midkey();
727     if (midkey != null) {
728       KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
729       byte [] fk = this.reader.getFirstKey();
730       KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
731       byte [] lk = this.reader.getLastKey();
732       KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
733       // if the midkey is the same as the first or last keys, we cannot (ever) split this region.
734       if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
735         if (LOG.isDebugEnabled()) {
736           LOG.debug("cannot split because midkey is the same as first or last row");
737         }
738         return null;
739       }
740       return mk.getRow();
741     }
742     return null;
743   }
744 
745   /**
746    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
747    * local because it is an implementation detail of the HBase regionserver.
748    */
749   public static class Writer implements Compactor.CellSink {
750     private final BloomFilterWriter generalBloomFilterWriter;
751     private final BloomFilterWriter deleteFamilyBloomFilterWriter;
752     private final BloomType bloomType;
753     private byte[] lastBloomKey;
754     private int lastBloomKeyOffset, lastBloomKeyLen;
755     private KVComparator kvComparator;
756     private Cell lastCell = null;
757     private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
758     private Cell lastDeleteFamilyCell = null;
759     private long deleteFamilyCnt = 0;
760 
761     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
762     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
763      * When flushing a memstore, we set TimeRange and use this variable to
764      * indicate that it doesn't need to be calculated again while
765      * appending KeyValues.
766      * It is not set in cases of compactions when it is recalculated using only
767      * the appended KeyValues*/
768     boolean isTimeRangeTrackerSet = false;
769 
770     protected HFile.Writer writer;
771 
772     /**
773      * Creates an HFile.Writer that also write helpful meta data.
774      * @param fs file system to write to
775      * @param path file name to create
776      * @param conf user configuration
777      * @param comparator key comparator
778      * @param bloomType bloom filter setting
779      * @param maxKeys the expected maximum number of keys to be added. Was used
780      *        for Bloom filter size in {@link HFile} format version 1.
781      * @param favoredNodes
782      * @param fileContext - The HFile context
783      * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file.
784      * @throws IOException problem writing to FS
785      */
786     private Writer(FileSystem fs, Path path,
787         final Configuration conf,
788         CacheConfig cacheConf,
789         final KVComparator comparator, BloomType bloomType, long maxKeys,
790         InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind)
791             throws IOException {
792       writer = HFile.getWriterFactory(conf, cacheConf)
793           .withPath(fs, path)
794           .withComparator(comparator)
795           .withFavoredNodes(favoredNodes)
796           .withFileContext(fileContext)
797           .withShouldDropCacheBehind(shouldDropCacheBehind)
798           .create();
799 
800       this.kvComparator = comparator;
801 
802       generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
803           conf, cacheConf, bloomType,
804           (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
805 
806       if (generalBloomFilterWriter != null) {
807         this.bloomType = bloomType;
808         if (LOG.isTraceEnabled()) LOG.trace("Bloom filter type for " + path + ": " +
809           this.bloomType + ", " + generalBloomFilterWriter.getClass().getSimpleName());
810       } else {
811         // Not using Bloom filters.
812         this.bloomType = BloomType.NONE;
813       }
814 
815       // initialize delete family Bloom filter when there is NO RowCol Bloom
816       // filter
817       if (this.bloomType != BloomType.ROWCOL) {
818         this.deleteFamilyBloomFilterWriter = BloomFilterFactory
819             .createDeleteBloomAtWrite(conf, cacheConf,
820                 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
821       } else {
822         deleteFamilyBloomFilterWriter = null;
823       }
824       if (deleteFamilyBloomFilterWriter != null) {
825         if (LOG.isTraceEnabled()) LOG.trace("Delete Family Bloom filter type for " + path + ": "
826             + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
827       }
828     }
829 
830     /**
831      * Writes meta data.
832      * Call before {@link #close()} since its written as meta data to this file.
833      * @param maxSequenceId Maximum sequence id.
834      * @param majorCompaction True if this file is product of a major compaction
835      * @throws IOException problem writing to FS
836      */
837     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
838     throws IOException {
839       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
840       writer.appendFileInfo(MAJOR_COMPACTION_KEY,
841           Bytes.toBytes(majorCompaction));
842       appendTrackedTimestampsToMetadata();
843     }
844 
845     /**
846      * Add TimestampRange and earliest put timestamp to Metadata
847      */
848     public void appendTrackedTimestampsToMetadata() throws IOException {
849       appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
850       appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
851     }
852 
853     /**
854      * Set TimeRangeTracker
855      * @param trt
856      */
857     public void setTimeRangeTracker(final TimeRangeTracker trt) {
858       this.timeRangeTracker = trt;
859       isTimeRangeTrackerSet = true;
860     }
861 
862     /**
863      * Record the earlest Put timestamp.
864      *
865      * If the timeRangeTracker is not set,
866      * update TimeRangeTracker to include the timestamp of this key
867      * @param cell
868      */
869     public void trackTimestamps(final Cell cell) {
870       if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
871         earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
872       }
873       if (!isTimeRangeTrackerSet) {
874         timeRangeTracker.includeTimestamp(cell);
875       }
876     }
877 
878     private void appendGeneralBloomfilter(final Cell cell) throws IOException {
879       if (this.generalBloomFilterWriter != null) {
880         // only add to the bloom filter on a new, unique key
881         boolean newKey = true;
882         if (this.lastCell != null) {
883           switch(bloomType) {
884           case ROW:
885             newKey = ! kvComparator.matchingRows(cell, lastCell);
886             break;
887           case ROWCOL:
888             newKey = ! kvComparator.matchingRowColumn(cell, lastCell);
889             break;
890           case NONE:
891             newKey = false;
892             break;
893           default:
894             throw new IOException("Invalid Bloom filter type: " + bloomType +
895                 " (ROW or ROWCOL expected)");
896           }
897         }
898         if (newKey) {
899           /*
900            * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
901            * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
902            *
903            * 2 Types of Filtering:
904            *  1. Row = Row
905            *  2. RowCol = Row + Qualifier
906            */
907           byte[] bloomKey;
908           int bloomKeyOffset, bloomKeyLen;
909 
910           switch (bloomType) {
911           case ROW:
912             bloomKey = cell.getRowArray();
913             bloomKeyOffset = cell.getRowOffset();
914             bloomKeyLen = cell.getRowLength();
915             break;
916           case ROWCOL:
917             // merge(row, qualifier)
918             // TODO: could save one buffer copy in case of compound Bloom
919             // filters when this involves creating a KeyValue
920             bloomKey = generalBloomFilterWriter.createBloomKey(cell.getRowArray(),
921                 cell.getRowOffset(), cell.getRowLength(), cell.getQualifierArray(),
922                 cell.getQualifierOffset(), cell.getQualifierLength());
923             bloomKeyOffset = 0;
924             bloomKeyLen = bloomKey.length;
925             break;
926           default:
927             throw new IOException("Invalid Bloom filter type: " + bloomType +
928                 " (ROW or ROWCOL expected)");
929           }
930           generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
931           if (lastBloomKey != null
932               && generalBloomFilterWriter.getComparator().compareFlatKey(bloomKey,
933                   bloomKeyOffset, bloomKeyLen, lastBloomKey,
934                   lastBloomKeyOffset, lastBloomKeyLen) <= 0) {
935             throw new IOException("Non-increasing Bloom keys: "
936                 + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen)
937                 + " after "
938                 + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset,
939                     lastBloomKeyLen));
940           }
941           lastBloomKey = bloomKey;
942           lastBloomKeyOffset = bloomKeyOffset;
943           lastBloomKeyLen = bloomKeyLen;
944           this.lastCell = cell;
945         }
946       }
947     }
948 
949     private void appendDeleteFamilyBloomFilter(final Cell cell)
950         throws IOException {
951       if (!CellUtil.isDeleteFamily(cell) && !CellUtil.isDeleteFamilyVersion(cell)) {
952         return;
953       }
954 
955       // increase the number of delete family in the store file
956       deleteFamilyCnt++;
957       if (null != this.deleteFamilyBloomFilterWriter) {
958         boolean newKey = true;
959         if (lastDeleteFamilyCell != null) {
960           newKey = !kvComparator.matchingRows(cell, lastDeleteFamilyCell);
961         }
962         if (newKey) {
963           this.deleteFamilyBloomFilterWriter.add(cell.getRowArray(),
964               cell.getRowOffset(), cell.getRowLength());
965           this.lastDeleteFamilyCell = cell;
966         }
967       }
968     }
969 
970     public void append(final Cell cell) throws IOException {
971       appendGeneralBloomfilter(cell);
972       appendDeleteFamilyBloomFilter(cell);
973       writer.append(cell);
974       trackTimestamps(cell);
975     }
976 
977     public Path getPath() {
978       return this.writer.getPath();
979     }
980 
981     boolean hasGeneralBloom() {
982       return this.generalBloomFilterWriter != null;
983     }
984 
985     /**
986      * For unit testing only.
987      *
988      * @return the Bloom filter used by this writer.
989      */
990     BloomFilterWriter getGeneralBloomWriter() {
991       return generalBloomFilterWriter;
992     }
993 
994     private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
995       boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
996       if (haveBloom) {
997         bfw.compactBloom();
998       }
999       return haveBloom;
1000     }
1001 
1002     private boolean closeGeneralBloomFilter() throws IOException {
1003       boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
1004 
1005       // add the general Bloom filter writer and append file info
1006       if (hasGeneralBloom) {
1007         writer.addGeneralBloomFilter(generalBloomFilterWriter);
1008         writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY,
1009             Bytes.toBytes(bloomType.toString()));
1010         if (lastBloomKey != null) {
1011           writer.appendFileInfo(LAST_BLOOM_KEY, Arrays.copyOfRange(
1012               lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
1013                   + lastBloomKeyLen));
1014         }
1015       }
1016       return hasGeneralBloom;
1017     }
1018 
1019     private boolean closeDeleteFamilyBloomFilter() throws IOException {
1020       boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
1021 
1022       // add the delete family Bloom filter writer
1023       if (hasDeleteFamilyBloom) {
1024         writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
1025       }
1026 
1027       // append file info about the number of delete family kvs
1028       // even if there is no delete family Bloom.
1029       writer.appendFileInfo(DELETE_FAMILY_COUNT,
1030           Bytes.toBytes(this.deleteFamilyCnt));
1031 
1032       return hasDeleteFamilyBloom;
1033     }
1034 
1035     public void close() throws IOException {
1036       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
1037       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
1038 
1039       writer.close();
1040 
1041       // Log final Bloom filter statistics. This needs to be done after close()
1042       // because compound Bloom filters might be finalized as part of closing.
1043       if (StoreFile.LOG.isTraceEnabled()) {
1044         StoreFile.LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
1045           (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
1046           getPath());
1047       }
1048 
1049     }
1050 
1051     public void appendFileInfo(byte[] key, byte[] value) throws IOException {
1052       writer.appendFileInfo(key, value);
1053     }
1054 
1055     /** For use in testing, e.g. {@link org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile}
1056      */
1057     HFile.Writer getHFileWriter() {
1058       return writer;
1059     }
1060   }
1061 
1062   /**
1063    * Reader for a StoreFile.
1064    */
1065   public static class Reader {
1066     private static final Log LOG = LogFactory.getLog(Reader.class.getName());
1067 
1068     protected BloomFilter generalBloomFilter = null;
1069     protected BloomFilter deleteFamilyBloomFilter = null;
1070     protected BloomType bloomFilterType;
1071     private final HFile.Reader reader;
1072     protected TimeRangeTracker timeRangeTracker = null;
1073     protected long sequenceID = -1;
1074     private byte[] lastBloomKey;
1075     private long deleteFamilyCnt = -1;
1076     private boolean bulkLoadResult = false;
1077 
1078     public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
1079         throws IOException {
1080       reader = HFile.createReader(fs, path, cacheConf, conf);
1081       bloomFilterType = BloomType.NONE;
1082     }
1083 
1084     public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
1085         CacheConfig cacheConf, Configuration conf) throws IOException {
1086       reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
1087       bloomFilterType = BloomType.NONE;
1088     }
1089 
1090     public void setReplicaStoreFile(boolean isPrimaryReplicaStoreFile) {
1091       reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile);
1092     }
1093     public boolean isPrimaryReplicaReader() {
1094       return reader.isPrimaryReplicaReader();
1095     }
1096 
1097     /**
1098      * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
1099      */
1100     Reader() {
1101       this.reader = null;
1102     }
1103 
1104     public KVComparator getComparator() {
1105       return reader.getComparator();
1106     }
1107 
1108     /**
1109      * Get a scanner to scan over this StoreFile. Do not use
1110      * this overload if using this scanner for compactions.
1111      *
1112      * @param cacheBlocks should this scanner cache blocks?
1113      * @param pread use pread (for highly concurrent small readers)
1114      * @return a scanner
1115      */
1116     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1117                                                boolean pread) {
1118       return getStoreFileScanner(cacheBlocks, pread, false,
1119         // 0 is passed as readpoint because this method is only used by test
1120         // where StoreFile is directly operated upon
1121         0);
1122     }
1123 
1124     /**
1125      * Get a scanner to scan over this StoreFile.
1126      *
1127      * @param cacheBlocks should this scanner cache blocks?
1128      * @param pread use pread (for highly concurrent small readers)
1129      * @param isCompaction is scanner being used for compaction?
1130      * @return a scanner
1131      */
1132     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1133                                                boolean pread,
1134                                                boolean isCompaction, long readPt) {
1135       return new StoreFileScanner(this,
1136                                  getScanner(cacheBlocks, pread, isCompaction),
1137                                  !isCompaction, reader.hasMVCCInfo(), readPt);
1138     }
1139 
1140     /**
1141      * Warning: Do not write further code which depends on this call. Instead
1142      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1143      * which is the preferred way to scan a store with higher level concepts.
1144      *
1145      * @param cacheBlocks should we cache the blocks?
1146      * @param pread use pread (for concurrent small readers)
1147      * @return the underlying HFileScanner
1148      */
1149     @Deprecated
1150     public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
1151       return getScanner(cacheBlocks, pread, false);
1152     }
1153 
1154     /**
1155      * Warning: Do not write further code which depends on this call. Instead
1156      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1157      * which is the preferred way to scan a store with higher level concepts.
1158      *
1159      * @param cacheBlocks
1160      *          should we cache the blocks?
1161      * @param pread
1162      *          use pread (for concurrent small readers)
1163      * @param isCompaction
1164      *          is scanner being used for compaction?
1165      * @return the underlying HFileScanner
1166      */
1167     @Deprecated
1168     public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
1169         boolean isCompaction) {
1170       return reader.getScanner(cacheBlocks, pread, isCompaction);
1171     }
1172 
1173     public void close(boolean evictOnClose) throws IOException {
1174       reader.close(evictOnClose);
1175     }
1176 
1177     /**
1178      * Check if this storeFile may contain keys within the TimeRange that
1179      * have not expired (i.e. not older than oldestUnexpiredTS).
1180      * @param timeRange the timeRange to restrict
1181      * @param oldestUnexpiredTS the oldest timestamp that is not expired, as
1182      *          determined by the column family's TTL
1183      * @return false if queried keys definitely don't exist in this StoreFile
1184      */
1185     boolean passesTimerangeFilter(TimeRange timeRange, long oldestUnexpiredTS) {
1186       if (timeRangeTracker == null) {
1187         return true;
1188       } else {
1189         return timeRangeTracker.includesTimeRange(timeRange) &&
1190             timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS;
1191       }
1192     }
1193 
1194     /**
1195      * Checks whether the given scan passes the Bloom filter (if present). Only
1196      * checks Bloom filters for single-row or single-row-column scans. Bloom
1197      * filter checking for multi-gets is implemented as part of the store
1198      * scanner system (see {@link StoreFileScanner#seekExactly}) and uses
1199      * the lower-level API {@link #passesGeneralBloomFilter(byte[], int, int, byte[],
1200      * int, int)}.
1201      *
1202      * @param scan the scan specification. Used to determine the row, and to
1203      *          check whether this is a single-row ("get") scan.
1204      * @param columns the set of columns. Only used for row-column Bloom
1205      *          filters.
1206      * @return true if the scan with the given column set passes the Bloom
1207      *         filter, or if the Bloom filter is not applicable for the scan.
1208      *         False if the Bloom filter is applicable and the scan fails it.
1209      */
1210      boolean passesBloomFilter(Scan scan,
1211         final SortedSet<byte[]> columns) {
1212       // Multi-column non-get scans will use Bloom filters through the
1213       // lower-level API function that this function calls.
1214       if (!scan.isGetScan()) {
1215         return true;
1216       }
1217 
1218       byte[] row = scan.getStartRow();
1219       switch (this.bloomFilterType) {
1220         case ROW:
1221           return passesGeneralBloomFilter(row, 0, row.length, null, 0, 0);
1222 
1223         case ROWCOL:
1224           if (columns != null && columns.size() == 1) {
1225             byte[] column = columns.first();
1226             return passesGeneralBloomFilter(row, 0, row.length, column, 0,
1227                 column.length);
1228           }
1229 
1230           // For multi-column queries the Bloom filter is checked from the
1231           // seekExact operation.
1232           return true;
1233 
1234         default:
1235           return true;
1236       }
1237     }
1238 
1239     public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
1240         int rowLen) {
1241       // Cache Bloom filter as a local variable in case it is set to null by
1242       // another thread on an IO error.
1243       BloomFilter bloomFilter = this.deleteFamilyBloomFilter;
1244 
1245       // Empty file or there is no delete family at all
1246       if (reader.getTrailer().getEntryCount() == 0 || deleteFamilyCnt == 0) {
1247         return false;
1248       }
1249 
1250       if (bloomFilter == null) {
1251         return true;
1252       }
1253 
1254       try {
1255         if (!bloomFilter.supportsAutoLoading()) {
1256           return true;
1257         }
1258         return bloomFilter.contains(row, rowOffset, rowLen, null);
1259       } catch (IllegalArgumentException e) {
1260         LOG.error("Bad Delete Family bloom filter data -- proceeding without",
1261             e);
1262         setDeleteFamilyBloomFilterFaulty();
1263       }
1264 
1265       return true;
1266     }
1267 
1268     /**
1269      * A method for checking Bloom filters. Called directly from
1270      * StoreFileScanner in case of a multi-column query.
1271      *
1272      * @param row
1273      * @param rowOffset
1274      * @param rowLen
1275      * @param col
1276      * @param colOffset
1277      * @param colLen
1278      * @return True if passes
1279      */
1280     public boolean passesGeneralBloomFilter(byte[] row, int rowOffset,
1281         int rowLen, byte[] col, int colOffset, int colLen) {
1282       // Cache Bloom filter as a local variable in case it is set to null by
1283       // another thread on an IO error.
1284       BloomFilter bloomFilter = this.generalBloomFilter;
1285       if (bloomFilter == null) {
1286         return true;
1287       }
1288 
1289       byte[] key;
1290       switch (bloomFilterType) {
1291         case ROW:
1292           if (col != null) {
1293             throw new RuntimeException("Row-only Bloom filter called with " +
1294                 "column specified");
1295           }
1296           if (rowOffset != 0 || rowLen != row.length) {
1297               throw new AssertionError("For row-only Bloom filters the row "
1298                   + "must occupy the whole array");
1299           }
1300           key = row;
1301           break;
1302 
1303         case ROWCOL:
1304           key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
1305               colOffset, colLen);
1306 
1307           break;
1308 
1309         default:
1310           return true;
1311       }
1312 
1313       // Empty file
1314       if (reader.getTrailer().getEntryCount() == 0)
1315         return false;
1316 
1317       try {
1318         boolean shouldCheckBloom;
1319         ByteBuffer bloom;
1320         if (bloomFilter.supportsAutoLoading()) {
1321           bloom = null;
1322           shouldCheckBloom = true;
1323         } else {
1324           bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY,
1325               true);
1326           shouldCheckBloom = bloom != null;
1327         }
1328 
1329         if (shouldCheckBloom) {
1330           boolean exists;
1331 
1332           // Whether the primary Bloom key is greater than the last Bloom key
1333           // from the file info. For row-column Bloom filters this is not yet
1334           // a sufficient condition to return false.
1335           boolean keyIsAfterLast = lastBloomKey != null
1336               && bloomFilter.getComparator().compareFlatKey(key, lastBloomKey) > 0;
1337 
1338           if (bloomFilterType == BloomType.ROWCOL) {
1339             // Since a Row Delete is essentially a DeleteFamily applied to all
1340             // columns, a file might be skipped if using row+col Bloom filter.
1341             // In order to ensure this file is included an additional check is
1342             // required looking only for a row bloom.
1343             byte[] rowBloomKey = bloomFilter.createBloomKey(row, rowOffset, rowLen,
1344                 null, 0, 0);
1345 
1346             if (keyIsAfterLast
1347                 && bloomFilter.getComparator().compareFlatKey(rowBloomKey,
1348                     lastBloomKey) > 0) {
1349               exists = false;
1350             } else {
1351               exists =
1352                   bloomFilter.contains(key, 0, key.length, bloom) ||
1353                   bloomFilter.contains(rowBloomKey, 0, rowBloomKey.length,
1354                       bloom);
1355             }
1356           } else {
1357             exists = !keyIsAfterLast
1358                 && bloomFilter.contains(key, 0, key.length, bloom);
1359           }
1360 
1361           return exists;
1362         }
1363       } catch (IOException e) {
1364         LOG.error("Error reading bloom filter data -- proceeding without",
1365             e);
1366         setGeneralBloomFilterFaulty();
1367       } catch (IllegalArgumentException e) {
1368         LOG.error("Bad bloom filter data -- proceeding without", e);
1369         setGeneralBloomFilterFaulty();
1370       }
1371 
1372       return true;
1373     }
1374 
1375     /**
1376      * Checks whether the given scan rowkey range overlaps with the current storefile's
1377      * @param scan the scan specification. Used to determine the rowkey range.
1378      * @return true if there is overlap, false otherwise
1379      */
1380     public boolean passesKeyRangeFilter(Scan scan) {
1381       if (this.getFirstKey() == null || this.getLastKey() == null) {
1382         // the file is empty
1383         return false;
1384       }
1385       if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
1386           && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
1387         return true;
1388       }
1389       KeyValue smallestScanKeyValue = scan.isReversed() ? KeyValueUtil
1390           .createFirstOnRow(scan.getStopRow()) : KeyValueUtil.createFirstOnRow(scan
1391           .getStartRow());
1392       KeyValue largestScanKeyValue = scan.isReversed() ? KeyValueUtil
1393           .createLastOnRow(scan.getStartRow()) : KeyValueUtil.createLastOnRow(scan
1394           .getStopRow());
1395       boolean nonOverLapping = (getComparator().compareFlatKey(
1396           this.getFirstKey(), largestScanKeyValue.getKey()) > 0 && !Bytes
1397           .equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(),
1398               HConstants.EMPTY_END_ROW))
1399           || getComparator().compareFlatKey(this.getLastKey(),
1400               smallestScanKeyValue.getKey()) < 0;
1401       return !nonOverLapping;
1402     }
1403 
1404     public Map<byte[], byte[]> loadFileInfo() throws IOException {
1405       Map<byte [], byte []> fi = reader.loadFileInfo();
1406 
1407       byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
1408       if (b != null) {
1409         bloomFilterType = BloomType.valueOf(Bytes.toString(b));
1410       }
1411 
1412       lastBloomKey = fi.get(LAST_BLOOM_KEY);
1413       byte[] cnt = fi.get(DELETE_FAMILY_COUNT);
1414       if (cnt != null) {
1415         deleteFamilyCnt = Bytes.toLong(cnt);
1416       }
1417 
1418       return fi;
1419     }
1420 
1421     public void loadBloomfilter() {
1422       this.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
1423       this.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
1424     }
1425 
1426     private void loadBloomfilter(BlockType blockType) {
1427       try {
1428         if (blockType == BlockType.GENERAL_BLOOM_META) {
1429           if (this.generalBloomFilter != null)
1430             return; // Bloom has been loaded
1431 
1432           DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
1433           if (bloomMeta != null) {
1434             // sanity check for NONE Bloom filter
1435             if (bloomFilterType == BloomType.NONE) {
1436               throw new IOException(
1437                   "valid bloom filter type not found in FileInfo");
1438             } else {
1439               generalBloomFilter = BloomFilterFactory.createFromMeta(bloomMeta,
1440                   reader);
1441               if (LOG.isTraceEnabled()) {
1442                 LOG.trace("Loaded " + bloomFilterType.toString() + " "
1443                   + generalBloomFilter.getClass().getSimpleName()
1444                   + " metadata for " + reader.getName());
1445               }
1446             }
1447           }
1448         } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1449           if (this.deleteFamilyBloomFilter != null)
1450             return; // Bloom has been loaded
1451 
1452           DataInput bloomMeta = reader.getDeleteBloomFilterMetadata();
1453           if (bloomMeta != null) {
1454             deleteFamilyBloomFilter = BloomFilterFactory.createFromMeta(
1455                 bloomMeta, reader);
1456             LOG.info("Loaded Delete Family Bloom ("
1457                 + deleteFamilyBloomFilter.getClass().getSimpleName()
1458                 + ") metadata for " + reader.getName());
1459           }
1460         } else {
1461           throw new RuntimeException("Block Type: " + blockType.toString()
1462               + "is not supported for Bloom filter");
1463         }
1464       } catch (IOException e) {
1465         LOG.error("Error reading bloom filter meta for " + blockType
1466             + " -- proceeding without", e);
1467         setBloomFilterFaulty(blockType);
1468       } catch (IllegalArgumentException e) {
1469         LOG.error("Bad bloom filter meta " + blockType
1470             + " -- proceeding without", e);
1471         setBloomFilterFaulty(blockType);
1472       }
1473     }
1474 
1475     private void setBloomFilterFaulty(BlockType blockType) {
1476       if (blockType == BlockType.GENERAL_BLOOM_META) {
1477         setGeneralBloomFilterFaulty();
1478       } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1479         setDeleteFamilyBloomFilterFaulty();
1480       }
1481     }
1482 
1483     /**
1484      * The number of Bloom filter entries in this store file, or an estimate
1485      * thereof, if the Bloom filter is not loaded. This always returns an upper
1486      * bound of the number of Bloom filter entries.
1487      *
1488      * @return an estimate of the number of Bloom filter entries in this file
1489      */
1490     public long getFilterEntries() {
1491       return generalBloomFilter != null ? generalBloomFilter.getKeyCount()
1492           : reader.getEntries();
1493     }
1494 
1495     public void setGeneralBloomFilterFaulty() {
1496       generalBloomFilter = null;
1497     }
1498 
1499     public void setDeleteFamilyBloomFilterFaulty() {
1500       this.deleteFamilyBloomFilter = null;
1501     }
1502 
1503     public byte[] getLastKey() {
1504       return reader.getLastKey();
1505     }
1506 
1507     public byte[] getLastRowKey() {
1508       return reader.getLastRowKey();
1509     }
1510 
1511     public byte[] midkey() throws IOException {
1512       return reader.midkey();
1513     }
1514 
1515     public long length() {
1516       return reader.length();
1517     }
1518 
1519     public long getTotalUncompressedBytes() {
1520       return reader.getTrailer().getTotalUncompressedBytes();
1521     }
1522 
1523     public long getEntries() {
1524       return reader.getEntries();
1525     }
1526 
1527     public long getDeleteFamilyCnt() {
1528       return deleteFamilyCnt;
1529     }
1530 
1531     public byte[] getFirstKey() {
1532       return reader.getFirstKey();
1533     }
1534 
1535     public long indexSize() {
1536       return reader.indexSize();
1537     }
1538 
1539     public BloomType getBloomFilterType() {
1540       return this.bloomFilterType;
1541     }
1542 
1543     public long getSequenceID() {
1544       return sequenceID;
1545     }
1546 
1547     public void setSequenceID(long sequenceID) {
1548       this.sequenceID = sequenceID;
1549     }
1550 
1551     public void setBulkLoaded(boolean bulkLoadResult) {
1552       this.bulkLoadResult = bulkLoadResult;
1553     }
1554 
1555     public boolean isBulkLoaded() {
1556       return this.bulkLoadResult;
1557     }
1558 
1559     BloomFilter getGeneralBloomFilter() {
1560       return generalBloomFilter;
1561     }
1562 
1563     long getUncompressedDataIndexSize() {
1564       return reader.getTrailer().getUncompressedDataIndexSize();
1565     }
1566 
1567     public long getTotalBloomSize() {
1568       if (generalBloomFilter == null)
1569         return 0;
1570       return generalBloomFilter.getByteSize();
1571     }
1572 
1573     public int getHFileVersion() {
1574       return reader.getTrailer().getMajorVersion();
1575     }
1576 
1577     public int getHFileMinorVersion() {
1578       return reader.getTrailer().getMinorVersion();
1579     }
1580 
1581     public HFile.Reader getHFileReader() {
1582       return reader;
1583     }
1584 
1585     void disableBloomFilterForTesting() {
1586       generalBloomFilter = null;
1587       this.deleteFamilyBloomFilter = null;
1588     }
1589 
1590     public long getMaxTimestamp() {
1591       return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp();
1592     }
1593   }
1594 
1595   /**
1596    * Useful comparators for comparing StoreFiles.
1597    */
1598   public abstract static class Comparators {
1599     /**
1600      * Comparator that compares based on the Sequence Ids of the
1601      * the StoreFiles. Bulk loads that did not request a seq ID
1602      * are given a seq id of -1; thus, they are placed before all non-
1603      * bulk loads, and bulk loads with sequence Id. Among these files,
1604      * the size is used to determine the ordering, then bulkLoadTime.
1605      * If there are ties, the path name is used as a tie-breaker.
1606      */
1607     public static final Comparator<StoreFile> SEQ_ID =
1608       Ordering.compound(ImmutableList.of(
1609           Ordering.natural().onResultOf(new GetSeqId()),
1610           Ordering.natural().onResultOf(new GetFileSize()).reverse(),
1611           Ordering.natural().onResultOf(new GetBulkTime()),
1612           Ordering.natural().onResultOf(new GetPathName())
1613       ));
1614 
1615     private static class GetSeqId implements Function<StoreFile, Long> {
1616       @Override
1617       public Long apply(StoreFile sf) {
1618         return sf.getMaxSequenceId();
1619       }
1620     }
1621 
1622     private static class GetFileSize implements Function<StoreFile, Long> {
1623       @Override
1624       public Long apply(StoreFile sf) {
1625         return sf.getReader().length();
1626       }
1627     }
1628 
1629     private static class GetBulkTime implements Function<StoreFile, Long> {
1630       @Override
1631       public Long apply(StoreFile sf) {
1632         if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
1633         return sf.getBulkLoadTimestamp();
1634       }
1635     }
1636 
1637     private static class GetPathName implements Function<StoreFile, String> {
1638       @Override
1639       public String apply(StoreFile sf) {
1640         return sf.getPath().getName();
1641       }
1642     }
1643   }
1644 }