View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.DataInput;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.nio.ByteBuffer;
25  import java.util.Arrays;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.Comparator;
29  import java.util.Map;
30  import java.util.SortedSet;
31  import java.util.UUID;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
43  import org.apache.hadoop.hbase.KeyValue;
44  import org.apache.hadoop.hbase.CellComparator;
45  import org.apache.hadoop.hbase.KeyValueUtil;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.client.Scan;
48  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
49  import org.apache.hadoop.hbase.io.hfile.BlockType;
50  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
51  import org.apache.hadoop.hbase.io.hfile.HFile;
52  import org.apache.hadoop.hbase.io.hfile.HFileContext;
53  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
54  import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
55  import org.apache.hadoop.hbase.util.BloomFilter;
56  import org.apache.hadoop.hbase.util.BloomFilterFactory;
57  import org.apache.hadoop.hbase.util.BloomFilterWriter;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.Writables;
60  import org.apache.hadoop.io.WritableUtils;
61  
62  import com.google.common.base.Function;
63  import com.google.common.base.Preconditions;
64  import com.google.common.collect.ImmutableList;
65  import com.google.common.collect.Ordering;
66  
67  /**
68   * A Store data file.  Stores usually have one or more of these files.  They
69   * are produced by flushing the memstore to disk.  To
70   * create, instantiate a writer using {@link StoreFile.WriterBuilder}
71   * and append data. Be sure to add any metadata before calling close on the
72   * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
73   * is sitting in the Filesystem.  To refer to it, create a StoreFile instance
74   * passing filesystem and path.  To read, call {@link #createReader()}.
75   * <p>StoreFiles may also reference store files in another Store.
76   *
77   * The reason for this weird pattern where you use a different instance for the
78   * writer and a reader is that we write once but read a lot more.
79   */
80  @InterfaceAudience.LimitedPrivate("Coprocessor")
81  public class StoreFile {
82    private static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
83  
84    // Keys for fileinfo values in HFile
85  
86    /** Max Sequence ID in FileInfo */
87    public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
88  
89    /** Major compaction flag in FileInfo */
90    public static final byte[] MAJOR_COMPACTION_KEY =
91        Bytes.toBytes("MAJOR_COMPACTION_KEY");
92  
93    /** Minor compaction flag in FileInfo */
94    public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
95        Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
96  
97    /** Bloom filter Type in FileInfo */
98    public static final byte[] BLOOM_FILTER_TYPE_KEY =
99        Bytes.toBytes("BLOOM_FILTER_TYPE");
100 
101   /** Delete Family Count in FileInfo */
102   public static final byte[] DELETE_FAMILY_COUNT =
103       Bytes.toBytes("DELETE_FAMILY_COUNT");
104 
105   /** Last Bloom filter key in FileInfo */
106   private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
107 
108   /** Key for Timerange information in metadata*/
109   public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
110 
111   /** Key for timestamp of earliest-put in metadata*/
112   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
113 
114   private final StoreFileInfo fileInfo;
115   private final FileSystem fs;
116 
117   // Block cache configuration and reference.
118   private final CacheConfig cacheConf;
119 
120   // Keys for metadata stored in backing HFile.
121   // Set when we obtain a Reader.
122   private long sequenceid = -1;
123 
124   // max of the MemstoreTS in the KV's in this store
125   // Set when we obtain a Reader.
126   private long maxMemstoreTS = -1;
127 
128   public long getMaxMemstoreTS() {
129     return maxMemstoreTS;
130   }
131 
132   public void setMaxMemstoreTS(long maxMemstoreTS) {
133     this.maxMemstoreTS = maxMemstoreTS;
134   }
135 
136   // If true, this file was product of a major compaction.  Its then set
137   // whenever you get a Reader.
138   private AtomicBoolean majorCompaction = null;
139 
140   // If true, this file should not be included in minor compactions.
141   // It's set whenever you get a Reader.
142   private boolean excludeFromMinorCompaction = false;
143 
144   /** Meta key set when store file is a result of a bulk load */
145   public static final byte[] BULKLOAD_TASK_KEY =
146     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
147   public static final byte[] BULKLOAD_TIME_KEY =
148     Bytes.toBytes("BULKLOAD_TIMESTAMP");
149 
150   /**
151    * Map of the metadata entries in the corresponding HFile
152    */
153   private Map<byte[], byte[]> metadataMap;
154 
155   // StoreFile.Reader
156   private volatile Reader reader;
157 
158   /**
159    * Bloom filter type specified in column family configuration. Does not
160    * necessarily correspond to the Bloom filter type present in the HFile.
161    */
162   private final BloomType cfBloomType;
163 
164   /**
165    * Constructor, loads a reader and it's indices, etc. May allocate a
166    * substantial amount of ram depending on the underlying files (10-20MB?).
167    *
168    * @param fs  The current file system to use.
169    * @param p  The path of the file.
170    * @param conf  The current configuration.
171    * @param cacheConf  The cache configuration and block cache reference.
172    * @param cfBloomType The bloom type to use for this store file as specified
173    *          by column family configuration. This may or may not be the same
174    *          as the Bloom filter type actually present in the HFile, because
175    *          column family configuration might change. If this is
176    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
177    * @throws IOException When opening the reader fails.
178    */
179   public StoreFile(final FileSystem fs, final Path p, final Configuration conf,
180         final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException {
181     this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType);
182   }
183 
184 
185   /**
186    * Constructor, loads a reader and it's indices, etc. May allocate a
187    * substantial amount of ram depending on the underlying files (10-20MB?).
188    *
189    * @param fs  The current file system to use.
190    * @param fileInfo  The store file information.
191    * @param conf  The current configuration.
192    * @param cacheConf  The cache configuration and block cache reference.
193    * @param cfBloomType The bloom type to use for this store file as specified
194    *          by column family configuration. This may or may not be the same
195    *          as the Bloom filter type actually present in the HFile, because
196    *          column family configuration might change. If this is
197    *          {@link BloomType#NONE}, the existing Bloom filter is ignored.
198    * @throws IOException When opening the reader fails.
199    */
200   public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf,
201       final CacheConfig cacheConf,  final BloomType cfBloomType) throws IOException {
202     this.fs = fs;
203     this.fileInfo = fileInfo;
204     this.cacheConf = cacheConf;
205 
206     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
207       this.cfBloomType = cfBloomType;
208     } else {
209       LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " +
210           "cfBloomType=" + cfBloomType + " (disabled in config)");
211       this.cfBloomType = BloomType.NONE;
212     }
213   }
214 
215   /**
216    * Clone
217    * @param other The StoreFile to clone from
218    */
219   public StoreFile(final StoreFile other) {
220     this.fs = other.fs;
221     this.fileInfo = other.fileInfo;
222     this.cacheConf = other.cacheConf;
223     this.cfBloomType = other.cfBloomType;
224   }
225 
226   /**
227    * @return the StoreFile object associated to this StoreFile.
228    *         null if the StoreFile is not a reference.
229    */
230   public StoreFileInfo getFileInfo() {
231     return this.fileInfo;
232   }
233 
234   /**
235    * @return Path or null if this StoreFile was made with a Stream.
236    */
237   public Path getPath() {
238     return this.fileInfo.getPath();
239   }
240 
241   /**
242    * @return Returns the qualified path of this StoreFile
243    */
244   public Path getQualifiedPath() {
245     return this.fileInfo.getPath().makeQualified(fs);
246   }
247 
248   /**
249    * @return True if this is a StoreFile Reference; call after {@link #open()}
250    * else may get wrong answer.
251    */
252   public boolean isReference() {
253     return this.fileInfo.isReference();
254   }
255 
256   /**
257    * @return True if this file was made by a major compaction.
258    */
259   public boolean isMajorCompaction() {
260     if (this.majorCompaction == null) {
261       throw new NullPointerException("This has not been set yet");
262     }
263     return this.majorCompaction.get();
264   }
265 
266   /**
267    * @return True if this file should not be part of a minor compaction.
268    */
269   public boolean excludeFromMinorCompaction() {
270     return this.excludeFromMinorCompaction;
271   }
272 
273   /**
274    * @return This files maximum edit sequence id.
275    */
276   public long getMaxSequenceId() {
277     return this.sequenceid;
278   }
279 
280   public long getModificationTimeStamp() throws IOException {
281     return (fileInfo == null) ? 0 : fileInfo.getModificationTime();
282   }
283 
284   /**
285    * Only used by the Striped Compaction Policy
286    * @param key
287    * @return value associated with the metadata key
288    */
289   public byte[] getMetadataValue(byte[] key) {
290     return metadataMap.get(key);
291   }
292 
293   /**
294    * Return the largest memstoreTS found across all storefiles in
295    * the given list. Store files that were created by a mapreduce
296    * bulk load are ignored, as they do not correspond to any specific
297    * put operation, and thus do not have a memstoreTS associated with them.
298    * @return 0 if no non-bulk-load files are provided or, this is Store that
299    * does not yet have any store files.
300    */
301   public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) {
302     long max = 0;
303     for (StoreFile sf : sfs) {
304       if (!sf.isBulkLoadResult()) {
305         max = Math.max(max, sf.getMaxMemstoreTS());
306       }
307     }
308     return max;
309   }
310 
311   /**
312    * Return the highest sequence ID found across all storefiles in
313    * the given list.
314    * @param sfs
315    * @return 0 if no non-bulk-load files are provided or, this is Store that
316    * does not yet have any store files.
317    */
318   public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
319     long max = 0;
320     for (StoreFile sf : sfs) {
321       max = Math.max(max, sf.getMaxSequenceId());
322     }
323     return max;
324   }
325 
326   /**
327    * Check if this storefile was created by bulk load.
328    * When a hfile is bulk loaded into HBase, we append
329    * '_SeqId_<id-when-loaded>' to the hfile name, unless
330    * "hbase.mapreduce.bulkload.assign.sequenceNumbers" is
331    * explicitly turned off.
332    * If "hbase.mapreduce.bulkload.assign.sequenceNumbers"
333    * is turned off, fall back to BULKLOAD_TIME_KEY.
334    * @return true if this storefile was created by bulk load.
335    */
336   boolean isBulkLoadResult() {
337     boolean bulkLoadedHFile = false;
338     String fileName = this.getPath().getName();
339     int startPos = fileName.indexOf("SeqId_");
340     if (startPos != -1) {
341       bulkLoadedHFile = true;
342     }
343     return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
344   }
345 
346   /**
347    * Return the timestamp at which this bulk load file was generated.
348    */
349   public long getBulkLoadTimestamp() {
350     byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
351     return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp);
352   }
353 
354   /**
355    * @return the cached value of HDFS blocks distribution. The cached value is
356    * calculated when store file is opened.
357    */
358   public HDFSBlocksDistribution getHDFSBlockDistribution() {
359     return this.fileInfo.getHDFSBlockDistribution();
360   }
361 
362   /**
363    * Opens reader on this store file.  Called by Constructor.
364    * @return Reader for the store file.
365    * @throws IOException
366    * @see #closeReader(boolean)
367    */
368   private Reader open() throws IOException {
369     if (this.reader != null) {
370       throw new IllegalAccessError("Already open");
371     }
372 
373     // Open the StoreFile.Reader
374     this.reader = fileInfo.open(this.fs, this.cacheConf);
375 
376     // Load up indices and fileinfo. This also loads Bloom filter type.
377     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
378 
379     // Read in our metadata.
380     byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
381     if (b != null) {
382       // By convention, if halfhfile, top half has a sequence number > bottom
383       // half. Thats why we add one in below. Its done for case the two halves
384       // are ever merged back together --rare.  Without it, on open of store,
385       // since store files are distinguished by sequence id, the one half would
386       // subsume the other.
387       this.sequenceid = Bytes.toLong(b);
388       if (fileInfo.isTopReference()) {
389         this.sequenceid += 1;
390       }
391     }
392 
393     if (isBulkLoadResult()){
394       // generate the sequenceId from the fileName
395       // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
396       String fileName = this.getPath().getName();
397       // Use lastIndexOf() to get the last, most recent bulk load seqId.
398       int startPos = fileName.lastIndexOf("SeqId_");
399       if (startPos != -1) {
400         this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
401             fileName.indexOf('_', startPos + 6)));
402         // Handle reference files as done above.
403         if (fileInfo.isTopReference()) {
404           this.sequenceid += 1;
405         }
406       }
407       this.reader.setBulkLoaded(true);
408     }
409     this.reader.setSequenceID(this.sequenceid);
410 
411     b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
412     if (b != null) {
413       this.maxMemstoreTS = Bytes.toLong(b);
414     }
415 
416     b = metadataMap.get(MAJOR_COMPACTION_KEY);
417     if (b != null) {
418       boolean mc = Bytes.toBoolean(b);
419       if (this.majorCompaction == null) {
420         this.majorCompaction = new AtomicBoolean(mc);
421       } else {
422         this.majorCompaction.set(mc);
423       }
424     } else {
425       // Presume it is not major compacted if it doesn't explicity say so
426       // HFileOutputFormat explicitly sets the major compacted key.
427       this.majorCompaction = new AtomicBoolean(false);
428     }
429 
430     b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
431     this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
432 
433     BloomType hfileBloomType = reader.getBloomFilterType();
434     if (cfBloomType != BloomType.NONE) {
435       reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
436       if (hfileBloomType != cfBloomType) {
437         LOG.info("HFile Bloom filter type for "
438             + reader.getHFileReader().getName() + ": " + hfileBloomType
439             + ", but " + cfBloomType + " specified in column family "
440             + "configuration");
441       }
442     } else if (hfileBloomType != BloomType.NONE) {
443       LOG.info("Bloom filter turned off by CF config for "
444           + reader.getHFileReader().getName());
445     }
446 
447     // load delete family bloom filter
448     reader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
449 
450     try {
451       byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
452       if (timerangeBytes != null) {
453         this.reader.timeRangeTracker = new TimeRangeTracker();
454         Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
455       }
456     } catch (IllegalArgumentException e) {
457       LOG.error("Error reading timestamp range data from meta -- " +
458           "proceeding without", e);
459       this.reader.timeRangeTracker = null;
460     }
461     return this.reader;
462   }
463 
464   /**
465    * @return Reader for StoreFile. creates if necessary
466    * @throws IOException
467    */
468   public Reader createReader() throws IOException {
469     if (this.reader == null) {
470       try {
471         this.reader = open();
472       } catch (IOException e) {
473         try {
474           this.closeReader(true);
475         } catch (IOException ee) {
476         }
477         throw e;
478       }
479 
480     }
481     return this.reader;
482   }
483 
484   /**
485    * @return Current reader.  Must call createReader first else returns null.
486    * @see #createReader()
487    */
488   public Reader getReader() {
489     return this.reader;
490   }
491 
492   /**
493    * @param evictOnClose whether to evict blocks belonging to this file
494    * @throws IOException
495    */
496   public synchronized void closeReader(boolean evictOnClose)
497       throws IOException {
498     if (this.reader != null) {
499       this.reader.close(evictOnClose);
500       this.reader = null;
501     }
502   }
503 
504   /**
505    * Delete this file
506    * @throws IOException
507    */
508   public void deleteReader() throws IOException {
509     closeReader(true);
510     this.fs.delete(getPath(), true);
511   }
512 
513   @Override
514   public String toString() {
515     return this.fileInfo.toString();
516   }
517 
518   /**
519    * @return a length description of this StoreFile, suitable for debug output
520    */
521   public String toStringDetailed() {
522     StringBuilder sb = new StringBuilder();
523     sb.append(this.getPath().toString());
524     sb.append(", isReference=").append(isReference());
525     sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
526     if (isBulkLoadResult()) {
527       sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
528     } else {
529       sb.append(", seqid=").append(getMaxSequenceId());
530     }
531     sb.append(", majorCompaction=").append(isMajorCompaction());
532 
533     return sb.toString();
534   }
535 
536   public static class WriterBuilder {
537     private final Configuration conf;
538     private final CacheConfig cacheConf;
539     private final FileSystem fs;
540 
541     private CellComparator comparator = CellComparator.COMPARATOR;
542     private BloomType bloomType = BloomType.NONE;
543     private long maxKeyCount = 0;
544     private Path dir;
545     private Path filePath;
546     private InetSocketAddress[] favoredNodes;
547     private HFileContext fileContext;
548     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
549         FileSystem fs) {
550       this.conf = conf;
551       this.cacheConf = cacheConf;
552       this.fs = fs;
553     }
554 
555     /**
556      * Use either this method or {@link #withFilePath}, but not both.
557      * @param dir Path to column family directory. The directory is created if
558      *          does not exist. The file is given a unique name within this
559      *          directory.
560      * @return this (for chained invocation)
561      */
562     public WriterBuilder withOutputDir(Path dir) {
563       Preconditions.checkNotNull(dir);
564       this.dir = dir;
565       return this;
566     }
567 
568     /**
569      * Use either this method or {@link #withOutputDir}, but not both.
570      * @param filePath the StoreFile path to write
571      * @return this (for chained invocation)
572      */
573     public WriterBuilder withFilePath(Path filePath) {
574       Preconditions.checkNotNull(filePath);
575       this.filePath = filePath;
576       return this;
577     }
578 
579     /**
580      * @param favoredNodes an array of favored nodes or possibly null
581      * @return this (for chained invocation)
582      */
583     public WriterBuilder withFavoredNodes(InetSocketAddress[] favoredNodes) {
584       this.favoredNodes = favoredNodes;
585       return this;
586     }
587 
588     public WriterBuilder withComparator(CellComparator comparator) {
589       Preconditions.checkNotNull(comparator);
590       this.comparator = comparator;
591       return this;
592     }
593 
594     public WriterBuilder withBloomType(BloomType bloomType) {
595       Preconditions.checkNotNull(bloomType);
596       this.bloomType = bloomType;
597       return this;
598     }
599 
600     /**
601      * @param maxKeyCount estimated maximum number of keys we expect to add
602      * @return this (for chained invocation)
603      */
604     public WriterBuilder withMaxKeyCount(long maxKeyCount) {
605       this.maxKeyCount = maxKeyCount;
606       return this;
607     }
608 
609     public WriterBuilder withFileContext(HFileContext fileContext) {
610       this.fileContext = fileContext;
611       return this;
612     }
613     /**
614      * Create a store file writer. Client is responsible for closing file when
615      * done. If metadata, add BEFORE closing using
616      * {@link Writer#appendMetadata}.
617      */
618     public Writer build() throws IOException {
619       if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
620         throw new IllegalArgumentException("Either specify parent directory " +
621             "or file path");
622       }
623 
624       if (dir == null) {
625         dir = filePath.getParent();
626       }
627 
628       if (!fs.exists(dir)) {
629         fs.mkdirs(dir);
630       }
631 
632       if (filePath == null) {
633         filePath = getUniqueFile(fs, dir);
634         if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
635           bloomType = BloomType.NONE;
636         }
637       }
638 
639       if (comparator == null) {
640         comparator = CellComparator.COMPARATOR;
641       }
642       return new Writer(fs, filePath,
643           conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext);
644     }
645   }
646 
647   /**
648    * @param fs
649    * @param dir Directory to create file in.
650    * @return random filename inside passed <code>dir</code>
651    */
652   public static Path getUniqueFile(final FileSystem fs, final Path dir)
653       throws IOException {
654     if (!fs.getFileStatus(dir).isDirectory()) {
655       throw new IOException("Expecting " + dir.toString() +
656         " to be a directory");
657     }
658     return new Path(dir, UUID.randomUUID().toString().replaceAll("-", ""));
659   }
660 
661   public Long getMinimumTimestamp() {
662     return (getReader().timeRangeTracker == null) ?
663         null :
664         getReader().timeRangeTracker.getMinimumTimestamp();
665   }
666 
667   /**
668    * Gets the approximate mid-point of this file that is optimal for use in splitting it.
669    * @param comparator Comparator used to compare KVs.
670    * @return The split point row, or null if splitting is not possible, or reader is null.
671    */
672   @SuppressWarnings("deprecation")
673   byte[] getFileSplitPoint(CellComparator comparator) throws IOException {
674     if (this.reader == null) {
675       LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
676       return null;
677     }
678     // Get first, last, and mid keys.  Midkey is the key that starts block
679     // in middle of hfile.  Has column and timestamp.  Need to return just
680     // the row we want to split on as midkey.
681     Cell midkey = this.reader.midkey();
682     if (midkey != null) {
683       Cell firstKey = this.reader.getFirstKey();
684       byte [] lk = this.reader.getLastKey();
685       KeyValue lastKey = KeyValueUtil.createKeyValueFromKey(lk, 0, lk.length);
686       // if the midkey is the same as the first or last keys, we cannot (ever) split this region.
687       if (comparator.compareRows(midkey, firstKey) == 0
688           || comparator.compareRows(midkey, lastKey) == 0) {
689         if (LOG.isDebugEnabled()) {
690           LOG.debug("cannot split because midkey is the same as first or last row");
691         }
692         return null;
693       }
694       return CellUtil.cloneRow(midkey);
695     }
696     return null;
697   }
698 
699   /**
700    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
701    * local because it is an implementation detail of the HBase regionserver.
702    */
703   public static class Writer implements Compactor.CellSink {
704     private final BloomFilterWriter generalBloomFilterWriter;
705     private final BloomFilterWriter deleteFamilyBloomFilterWriter;
706     private final BloomType bloomType;
707     private byte[] lastBloomKey;
708     private int lastBloomKeyOffset, lastBloomKeyLen;
709     private Cell lastCell = null;
710     private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
711     private Cell lastDeleteFamilyCell = null;
712     private long deleteFamilyCnt = 0;
713 
714     /** Bytes per Checksum */
715     protected int bytesPerChecksum;
716 
717     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
718     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
719      * When flushing a memstore, we set TimeRange and use this variable to
720      * indicate that it doesn't need to be calculated again while
721      * appending KeyValues.
722      * It is not set in cases of compactions when it is recalculated using only
723      * the appended KeyValues*/
724     boolean isTimeRangeTrackerSet = false;
725 
726     protected HFile.Writer writer;
727     private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
728 
729     /**
730      * Creates an HFile.Writer that also write helpful meta data.
731      * @param fs file system to write to
732      * @param path file name to create
733      * @param conf user configuration
734      * @param comparator key comparator
735      * @param bloomType bloom filter setting
736      * @param maxKeys the expected maximum number of keys to be added. Was used
737      *        for Bloom filter size in {@link HFile} format version 1.
738      * @param favoredNodes
739      * @param fileContext - The HFile context
740      * @throws IOException problem writing to FS
741      */
742     private Writer(FileSystem fs, Path path,
743         final Configuration conf,
744         CacheConfig cacheConf,
745         final CellComparator comparator, BloomType bloomType, long maxKeys,
746         InetSocketAddress[] favoredNodes, HFileContext fileContext)
747             throws IOException {
748       writer = HFile.getWriterFactory(conf, cacheConf)
749           .withPath(fs, path)
750           .withComparator(comparator)
751           .withFavoredNodes(favoredNodes)
752           .withFileContext(fileContext)
753           .create();
754 
755       generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
756           conf, cacheConf, bloomType,
757           (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
758 
759       if (generalBloomFilterWriter != null) {
760         this.bloomType = bloomType;
761         if(this.bloomType ==  BloomType.ROWCOL) {
762           lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue();
763         }
764         if (LOG.isTraceEnabled()) LOG.trace("Bloom filter type for " + path + ": " +
765           this.bloomType + ", " + generalBloomFilterWriter.getClass().getSimpleName());
766       } else {
767         // Not using Bloom filters.
768         this.bloomType = BloomType.NONE;
769       }
770 
771       // initialize delete family Bloom filter when there is NO RowCol Bloom
772       // filter
773       if (this.bloomType != BloomType.ROWCOL) {
774         this.deleteFamilyBloomFilterWriter = BloomFilterFactory
775             .createDeleteBloomAtWrite(conf, cacheConf,
776                 (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
777       } else {
778         deleteFamilyBloomFilterWriter = null;
779       }
780       if (deleteFamilyBloomFilterWriter != null) {
781         if (LOG.isTraceEnabled()) LOG.trace("Delete Family Bloom filter type for " + path + ": "
782             + deleteFamilyBloomFilterWriter.getClass().getSimpleName());
783       }
784     }
785 
786     /**
787      * Writes meta data.
788      * Call before {@link #close()} since its written as meta data to this file.
789      * @param maxSequenceId Maximum sequence id.
790      * @param majorCompaction True if this file is product of a major compaction
791      * @throws IOException problem writing to FS
792      */
793     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
794     throws IOException {
795       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
796       writer.appendFileInfo(MAJOR_COMPACTION_KEY,
797           Bytes.toBytes(majorCompaction));
798       appendTrackedTimestampsToMetadata();
799     }
800 
801     /**
802      * Add TimestampRange and earliest put timestamp to Metadata
803      */
804     public void appendTrackedTimestampsToMetadata() throws IOException {
805       appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
806       appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
807     }
808 
809     /**
810      * Set TimeRangeTracker
811      * @param trt
812      */
813     public void setTimeRangeTracker(final TimeRangeTracker trt) {
814       this.timeRangeTracker = trt;
815       isTimeRangeTrackerSet = true;
816     }
817 
818     /**
819      * Record the earlest Put timestamp.
820      *
821      * If the timeRangeTracker is not set,
822      * update TimeRangeTracker to include the timestamp of this key
823      * @param cell
824      */
825     public void trackTimestamps(final Cell cell) {
826       if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
827         earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
828       }
829       if (!isTimeRangeTrackerSet) {
830         timeRangeTracker.includeTimestamp(cell);
831       }
832     }
833 
834     private void appendGeneralBloomfilter(final Cell cell) throws IOException {
835       if (this.generalBloomFilterWriter != null) {
836         // only add to the bloom filter on a new, unique key
837         boolean newKey = true;
838         if (this.lastCell != null) {
839           switch(bloomType) {
840           case ROW:
841             newKey = ! CellUtil.matchingRows(cell, lastCell);
842             break;
843           case ROWCOL:
844             newKey = ! CellUtil.matchingRowColumn(cell, lastCell);
845             break;
846           case NONE:
847             newKey = false;
848             break;
849           default:
850             throw new IOException("Invalid Bloom filter type: " + bloomType +
851                 " (ROW or ROWCOL expected)");
852           }
853         }
854         if (newKey) {
855           /*
856            * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
857            * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
858            *
859            * 2 Types of Filtering:
860            *  1. Row = Row
861            *  2. RowCol = Row + Qualifier
862            */
863           byte[] bloomKey = null;
864           // Used with ROW_COL bloom
865           KeyValue bloomKeyKV = null;
866           int bloomKeyOffset, bloomKeyLen;
867 
868           switch (bloomType) {
869           case ROW:
870             bloomKey = cell.getRowArray();
871             bloomKeyOffset = cell.getRowOffset();
872             bloomKeyLen = cell.getRowLength();
873             break;
874           case ROWCOL:
875             // merge(row, qualifier)
876             // TODO: could save one buffer copy in case of compound Bloom
877             // filters when this involves creating a KeyValue
878             bloomKeyKV = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(),
879                 cell.getRowLength(), 
880                 HConstants.EMPTY_BYTE_ARRAY, 0, 0, cell.getQualifierArray(),
881                 cell.getQualifierOffset(),
882                 cell.getQualifierLength());
883             bloomKey = bloomKeyKV.getBuffer();
884             bloomKeyOffset = bloomKeyKV.getKeyOffset();
885             bloomKeyLen = bloomKeyKV.getKeyLength();
886             break;
887           default:
888             throw new IOException("Invalid Bloom filter type: " + bloomType +
889                 " (ROW or ROWCOL expected)");
890           }
891           generalBloomFilterWriter.add(bloomKey, bloomKeyOffset, bloomKeyLen);
892           if (lastBloomKey != null) {
893             int res = 0;
894             // hbase:meta does not have blooms. So we need not have special interpretation
895             // of the hbase:meta cells.  We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
896             if (bloomType == BloomType.ROW) {
897               res = Bytes.BYTES_RAWCOMPARATOR.compare(bloomKey, bloomKeyOffset, bloomKeyLen,
898                   lastBloomKey, lastBloomKeyOffset, lastBloomKeyLen);
899             } else {
900               // TODO : Caching of kv components becomes important in these cases
901               res = CellComparator.COMPARATOR.compare(bloomKeyKV, lastBloomKeyOnlyKV);
902             }
903             if (res <= 0) {
904               throw new IOException("Non-increasing Bloom keys: "
905                   + Bytes.toStringBinary(bloomKey, bloomKeyOffset, bloomKeyLen) + " after "
906                   + Bytes.toStringBinary(lastBloomKey, lastBloomKeyOffset, lastBloomKeyLen));
907             }
908           }
909           lastBloomKey = bloomKey;
910           lastBloomKeyOffset = bloomKeyOffset;
911           lastBloomKeyLen = bloomKeyLen;
912           if (bloomType == BloomType.ROWCOL) {
913             lastBloomKeyOnlyKV.setKey(bloomKey, bloomKeyOffset, bloomKeyLen);
914           }
915           this.lastCell = cell;
916         }
917       }
918     }
919 
920     private void appendDeleteFamilyBloomFilter(final Cell cell)
921         throws IOException {
922       if (!CellUtil.isDeleteFamily(cell) && !CellUtil.isDeleteFamilyVersion(cell)) {
923         return;
924       }
925 
926       // increase the number of delete family in the store file
927       deleteFamilyCnt++;
928       if (null != this.deleteFamilyBloomFilterWriter) {
929         boolean newKey = true;
930         if (lastDeleteFamilyCell != null) {
931           // hbase:meta does not have blooms. So we need not have special interpretation
932           // of the hbase:meta cells
933           newKey = !CellUtil.matchingRows(cell, lastDeleteFamilyCell);
934         }
935         if (newKey) {
936           this.deleteFamilyBloomFilterWriter.add(cell.getRowArray(),
937               cell.getRowOffset(), cell.getRowLength());
938           this.lastDeleteFamilyCell = cell;
939         }
940       }
941     }
942 
943     public void append(final Cell cell) throws IOException {
944       appendGeneralBloomfilter(cell);
945       appendDeleteFamilyBloomFilter(cell);
946       writer.append(cell);
947       trackTimestamps(cell);
948     }
949 
950     public Path getPath() {
951       return this.writer.getPath();
952     }
953 
954     boolean hasGeneralBloom() {
955       return this.generalBloomFilterWriter != null;
956     }
957 
958     /**
959      * For unit testing only.
960      *
961      * @return the Bloom filter used by this writer.
962      */
963     BloomFilterWriter getGeneralBloomWriter() {
964       return generalBloomFilterWriter;
965     }
966 
967     private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
968       boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
969       if (haveBloom) {
970         bfw.compactBloom();
971       }
972       return haveBloom;
973     }
974 
975     private boolean closeGeneralBloomFilter() throws IOException {
976       boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
977 
978       // add the general Bloom filter writer and append file info
979       if (hasGeneralBloom) {
980         writer.addGeneralBloomFilter(generalBloomFilterWriter);
981         writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY,
982             Bytes.toBytes(bloomType.toString()));
983         if (lastBloomKey != null) {
984           writer.appendFileInfo(LAST_BLOOM_KEY, Arrays.copyOfRange(
985               lastBloomKey, lastBloomKeyOffset, lastBloomKeyOffset
986                   + lastBloomKeyLen));
987         }
988       }
989       return hasGeneralBloom;
990     }
991 
992     private boolean closeDeleteFamilyBloomFilter() throws IOException {
993       boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
994 
995       // add the delete family Bloom filter writer
996       if (hasDeleteFamilyBloom) {
997         writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
998       }
999 
1000       // append file info about the number of delete family kvs
1001       // even if there is no delete family Bloom.
1002       writer.appendFileInfo(DELETE_FAMILY_COUNT,
1003           Bytes.toBytes(this.deleteFamilyCnt));
1004 
1005       return hasDeleteFamilyBloom;
1006     }
1007 
1008     public void close() throws IOException {
1009       boolean hasGeneralBloom = this.closeGeneralBloomFilter();
1010       boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
1011 
1012       writer.close();
1013 
1014       // Log final Bloom filter statistics. This needs to be done after close()
1015       // because compound Bloom filters might be finalized as part of closing.
1016       if (StoreFile.LOG.isTraceEnabled()) {
1017         StoreFile.LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
1018           (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
1019           getPath());
1020       }
1021 
1022     }
1023 
1024     public void appendFileInfo(byte[] key, byte[] value) throws IOException {
1025       writer.appendFileInfo(key, value);
1026     }
1027 
1028     /** For use in testing, e.g. {@link org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile}
1029      */
1030     HFile.Writer getHFileWriter() {
1031       return writer;
1032     }
1033   }
1034 
1035   /**
1036    * Reader for a StoreFile.
1037    */
1038   public static class Reader {
1039     private static final Log LOG = LogFactory.getLog(Reader.class.getName());
1040 
1041     protected BloomFilter generalBloomFilter = null;
1042     protected BloomFilter deleteFamilyBloomFilter = null;
1043     protected BloomType bloomFilterType;
1044     private final HFile.Reader reader;
1045     protected TimeRangeTracker timeRangeTracker = null;
1046     protected long sequenceID = -1;
1047     private byte[] lastBloomKey;
1048     private long deleteFamilyCnt = -1;
1049     private boolean bulkLoadResult = false;
1050     private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
1051 
1052     public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
1053         throws IOException {
1054       reader = HFile.createReader(fs, path, cacheConf, conf);
1055       bloomFilterType = BloomType.NONE;
1056     }
1057 
1058     public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
1059         CacheConfig cacheConf, Configuration conf) throws IOException {
1060       reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
1061       bloomFilterType = BloomType.NONE;
1062     }
1063 
1064     /**
1065      * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
1066      */
1067     Reader() {
1068       this.reader = null;
1069     }
1070 
1071     public CellComparator getComparator() {
1072       return reader.getComparator();
1073     }
1074 
1075     /**
1076      * Get a scanner to scan over this StoreFile. Do not use
1077      * this overload if using this scanner for compactions.
1078      *
1079      * @param cacheBlocks should this scanner cache blocks?
1080      * @param pread use pread (for highly concurrent small readers)
1081      * @return a scanner
1082      */
1083     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1084                                                boolean pread) {
1085       return getStoreFileScanner(cacheBlocks, pread, false,
1086         // 0 is passed as readpoint because this method is only used by test
1087         // where StoreFile is directly operated upon
1088         0);
1089     }
1090 
1091     /**
1092      * Get a scanner to scan over this StoreFile.
1093      *
1094      * @param cacheBlocks should this scanner cache blocks?
1095      * @param pread use pread (for highly concurrent small readers)
1096      * @param isCompaction is scanner being used for compaction?
1097      * @return a scanner
1098      */
1099     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
1100                                                boolean pread,
1101                                                boolean isCompaction, long readPt) {
1102       return new StoreFileScanner(this,
1103                                  getScanner(cacheBlocks, pread, isCompaction),
1104                                  !isCompaction, reader.hasMVCCInfo(), readPt);
1105     }
1106 
1107     /**
1108      * Warning: Do not write further code which depends on this call. Instead
1109      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1110      * which is the preferred way to scan a store with higher level concepts.
1111      *
1112      * @param cacheBlocks should we cache the blocks?
1113      * @param pread use pread (for concurrent small readers)
1114      * @return the underlying HFileScanner
1115      */
1116     @Deprecated
1117     public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
1118       return getScanner(cacheBlocks, pread, false);
1119     }
1120 
1121     /**
1122      * Warning: Do not write further code which depends on this call. Instead
1123      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
1124      * which is the preferred way to scan a store with higher level concepts.
1125      *
1126      * @param cacheBlocks
1127      *          should we cache the blocks?
1128      * @param pread
1129      *          use pread (for concurrent small readers)
1130      * @param isCompaction
1131      *          is scanner being used for compaction?
1132      * @return the underlying HFileScanner
1133      */
1134     @Deprecated
1135     public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
1136         boolean isCompaction) {
1137       return reader.getScanner(cacheBlocks, pread, isCompaction);
1138     }
1139 
1140     public void close(boolean evictOnClose) throws IOException {
1141       reader.close(evictOnClose);
1142     }
1143 
1144     /**
1145      * Check if this storeFile may contain keys within the TimeRange that
1146      * have not expired (i.e. not older than oldestUnexpiredTS).
1147      * @param scan the current scan
1148      * @param oldestUnexpiredTS the oldest timestamp that is not expired, as
1149      *          determined by the column family's TTL
1150      * @return false if queried keys definitely don't exist in this StoreFile
1151      */
1152     boolean passesTimerangeFilter(Scan scan, long oldestUnexpiredTS) {
1153       if (timeRangeTracker == null) {
1154         return true;
1155       } else {
1156         return timeRangeTracker.includesTimeRange(scan.getTimeRange()) &&
1157             timeRangeTracker.getMaximumTimestamp() >= oldestUnexpiredTS;
1158       }
1159     }
1160 
1161     /**
1162      * Checks whether the given scan passes the Bloom filter (if present). Only
1163      * checks Bloom filters for single-row or single-row-column scans. Bloom
1164      * filter checking for multi-gets is implemented as part of the store
1165      * scanner system (see {@link StoreFileScanner#seekExactly}) and uses
1166      * the lower-level API {@link #passesGeneralBloomFilter(byte[], int, int, byte[],
1167      * int, int)}.
1168      *
1169      * @param scan the scan specification. Used to determine the row, and to
1170      *          check whether this is a single-row ("get") scan.
1171      * @param columns the set of columns. Only used for row-column Bloom
1172      *          filters.
1173      * @return true if the scan with the given column set passes the Bloom
1174      *         filter, or if the Bloom filter is not applicable for the scan.
1175      *         False if the Bloom filter is applicable and the scan fails it.
1176      */
1177      boolean passesBloomFilter(Scan scan,
1178         final SortedSet<byte[]> columns) {
1179       // Multi-column non-get scans will use Bloom filters through the
1180       // lower-level API function that this function calls.
1181       if (!scan.isGetScan()) {
1182         return true;
1183       }
1184 
1185       byte[] row = scan.getStartRow();
1186       switch (this.bloomFilterType) {
1187         case ROW:
1188           return passesGeneralBloomFilter(row, 0, row.length, null, 0, 0);
1189 
1190         case ROWCOL:
1191           if (columns != null && columns.size() == 1) {
1192             byte[] column = columns.first();
1193             return passesGeneralBloomFilter(row, 0, row.length, column, 0,
1194                 column.length);
1195           }
1196 
1197           // For multi-column queries the Bloom filter is checked from the
1198           // seekExact operation.
1199           return true;
1200 
1201         default:
1202           return true;
1203       }
1204     }
1205 
1206     public boolean passesDeleteFamilyBloomFilter(byte[] row, int rowOffset,
1207         int rowLen) {
1208       // Cache Bloom filter as a local variable in case it is set to null by
1209       // another thread on an IO error.
1210       BloomFilter bloomFilter = this.deleteFamilyBloomFilter;
1211 
1212       // Empty file or there is no delete family at all
1213       if (reader.getTrailer().getEntryCount() == 0 || deleteFamilyCnt == 0) {
1214         return false;
1215       }
1216 
1217       if (bloomFilter == null) {
1218         return true;
1219       }
1220 
1221       try {
1222         if (!bloomFilter.supportsAutoLoading()) {
1223           return true;
1224         }
1225         return bloomFilter.contains(row, rowOffset, rowLen, null);
1226       } catch (IllegalArgumentException e) {
1227         LOG.error("Bad Delete Family bloom filter data -- proceeding without",
1228             e);
1229         setDeleteFamilyBloomFilterFaulty();
1230       }
1231 
1232       return true;
1233     }
1234 
1235     /**
1236      * A method for checking Bloom filters. Called directly from
1237      * StoreFileScanner in case of a multi-column query.
1238      *
1239      * @param row
1240      * @param rowOffset
1241      * @param rowLen
1242      * @param col
1243      * @param colOffset
1244      * @param colLen
1245      * @return True if passes
1246      */
1247     public boolean passesGeneralBloomFilter(byte[] row, int rowOffset,
1248         int rowLen, byte[] col, int colOffset, int colLen) {
1249       // Cache Bloom filter as a local variable in case it is set to null by
1250       // another thread on an IO error.
1251       BloomFilter bloomFilter = this.generalBloomFilter;
1252       if (bloomFilter == null) {
1253         return true;
1254       }
1255 
1256       // Used in ROW bloom
1257       byte[] key = null;
1258       // Used in ROW_COL bloom
1259       KeyValue kvKey = null;
1260       switch (bloomFilterType) {
1261         case ROW:
1262           if (col != null) {
1263             throw new RuntimeException("Row-only Bloom filter called with " +
1264                 "column specified");
1265           }
1266           if (rowOffset != 0 || rowLen != row.length) {
1267               throw new AssertionError("For row-only Bloom filters the row "
1268                   + "must occupy the whole array");
1269           }
1270           key = row;
1271           break;
1272 
1273         case ROWCOL:
1274           kvKey = KeyValueUtil.createFirstOnRow(row, rowOffset, rowLen, 
1275               HConstants.EMPTY_BYTE_ARRAY, 0, 0, col, colOffset,
1276               colLen);
1277           break;
1278 
1279         default:
1280           return true;
1281       }
1282 
1283       // Empty file
1284       if (reader.getTrailer().getEntryCount() == 0)
1285         return false;
1286 
1287       try {
1288         boolean shouldCheckBloom;
1289         ByteBuffer bloom;
1290         if (bloomFilter.supportsAutoLoading()) {
1291           bloom = null;
1292           shouldCheckBloom = true;
1293         } else {
1294           bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY,
1295               true);
1296           shouldCheckBloom = bloom != null;
1297         }
1298 
1299         if (shouldCheckBloom) {
1300           boolean exists;
1301 
1302           // Whether the primary Bloom key is greater than the last Bloom key
1303           // from the file info. For row-column Bloom filters this is not yet
1304           // a sufficient condition to return false.
1305           boolean keyIsAfterLast = (lastBloomKey != null);
1306           // hbase:meta does not have blooms. So we need not have special interpretation
1307           // of the hbase:meta cells.  We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
1308           if (keyIsAfterLast) {
1309             if (bloomFilterType == BloomType.ROW) {
1310               keyIsAfterLast = (Bytes.BYTES_RAWCOMPARATOR.compare(key, lastBloomKey) > 0);
1311             } else {
1312               keyIsAfterLast = (CellComparator.COMPARATOR.compare(kvKey, lastBloomKeyOnlyKV)) > 0;
1313             }
1314           }
1315 
1316           if (bloomFilterType == BloomType.ROWCOL) {
1317             // Since a Row Delete is essentially a DeleteFamily applied to all
1318             // columns, a file might be skipped if using row+col Bloom filter.
1319             // In order to ensure this file is included an additional check is
1320             // required looking only for a row bloom.
1321             KeyValue rowBloomKey = KeyValueUtil.createFirstOnRow(row, rowOffset, rowLen,
1322                 HConstants.EMPTY_BYTE_ARRAY, 0, 0, HConstants.EMPTY_BYTE_ARRAY, 0, 0);
1323             // hbase:meta does not have blooms. So we need not have special interpretation
1324             // of the hbase:meta cells.  We can safely use Bytes.BYTES_RAWCOMPARATOR for ROW Bloom
1325             if (keyIsAfterLast
1326                 && (CellComparator.COMPARATOR.compare(rowBloomKey, lastBloomKeyOnlyKV)) > 0) {
1327               exists = false;
1328             } else {
1329               exists =
1330                   bloomFilter.contains(kvKey, bloom) ||
1331                   bloomFilter.contains(rowBloomKey, bloom);
1332             }
1333           } else {
1334             exists = !keyIsAfterLast
1335                 && bloomFilter.contains(key, 0, key.length, bloom);
1336           }
1337 
1338           return exists;
1339         }
1340       } catch (IOException e) {
1341         LOG.error("Error reading bloom filter data -- proceeding without",
1342             e);
1343         setGeneralBloomFilterFaulty();
1344       } catch (IllegalArgumentException e) {
1345         LOG.error("Bad bloom filter data -- proceeding without", e);
1346         setGeneralBloomFilterFaulty();
1347       }
1348 
1349       return true;
1350     }
1351 
1352     /**
1353      * Checks whether the given scan rowkey range overlaps with the current storefile's
1354      * @param scan the scan specification. Used to determine the rowkey range.
1355      * @return true if there is overlap, false otherwise
1356      */
1357     public boolean passesKeyRangeFilter(Scan scan) {
1358       if (this.getFirstKey() == null || this.getLastKey() == null) {
1359         // the file is empty
1360         return false;
1361       }
1362       if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)
1363           && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
1364         return true;
1365       }
1366       KeyValue smallestScanKeyValue = scan.isReversed() ? KeyValueUtil
1367           .createFirstOnRow(scan.getStopRow()) : KeyValueUtil.createFirstOnRow(scan
1368           .getStartRow());
1369       KeyValue largestScanKeyValue = scan.isReversed() ? KeyValueUtil
1370           .createLastOnRow(scan.getStartRow()) : KeyValueUtil.createLastOnRow(scan
1371           .getStopRow());
1372       // TODO this is in hot path? Optimize and avoid 2 extra object creations.
1373       Cell firstKeyKV = this.getFirstKey();
1374       KeyValue.KeyOnlyKeyValue lastKeyKV = 
1375           new KeyValue.KeyOnlyKeyValue(this.getLastKey(), 0, this.getLastKey().length);
1376       boolean nonOverLapping = ((getComparator().compare(firstKeyKV, largestScanKeyValue)) > 0
1377           && !Bytes
1378           .equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(),
1379               HConstants.EMPTY_END_ROW))
1380           || (getComparator().compare(lastKeyKV, smallestScanKeyValue)) < 0;
1381       return !nonOverLapping;
1382     }
1383 
1384     public Map<byte[], byte[]> loadFileInfo() throws IOException {
1385       Map<byte [], byte []> fi = reader.loadFileInfo();
1386 
1387       byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
1388       if (b != null) {
1389         bloomFilterType = BloomType.valueOf(Bytes.toString(b));
1390       }
1391 
1392       lastBloomKey = fi.get(LAST_BLOOM_KEY);
1393       if(bloomFilterType == BloomType.ROWCOL) {
1394         lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue(lastBloomKey, 0, lastBloomKey.length);
1395       }
1396       byte[] cnt = fi.get(DELETE_FAMILY_COUNT);
1397       if (cnt != null) {
1398         deleteFamilyCnt = Bytes.toLong(cnt);
1399       }
1400 
1401       return fi;
1402     }
1403 
1404     public void loadBloomfilter() {
1405       this.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
1406       this.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
1407     }
1408 
1409     private void loadBloomfilter(BlockType blockType) {
1410       try {
1411         if (blockType == BlockType.GENERAL_BLOOM_META) {
1412           if (this.generalBloomFilter != null)
1413             return; // Bloom has been loaded
1414 
1415           DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
1416           if (bloomMeta != null) {
1417             // sanity check for NONE Bloom filter
1418             if (bloomFilterType == BloomType.NONE) {
1419               throw new IOException(
1420                   "valid bloom filter type not found in FileInfo");
1421             } else {
1422               generalBloomFilter = BloomFilterFactory.createFromMeta(bloomMeta,
1423                   reader);
1424               if (LOG.isTraceEnabled()) {
1425                 LOG.trace("Loaded " + bloomFilterType.toString() + " "
1426                   + generalBloomFilter.getClass().getSimpleName()
1427                   + " metadata for " + reader.getName());
1428               }
1429             }
1430           }
1431         } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1432           if (this.deleteFamilyBloomFilter != null)
1433             return; // Bloom has been loaded
1434 
1435           DataInput bloomMeta = reader.getDeleteBloomFilterMetadata();
1436           if (bloomMeta != null) {
1437             deleteFamilyBloomFilter = BloomFilterFactory.createFromMeta(
1438                 bloomMeta, reader);
1439             LOG.info("Loaded Delete Family Bloom ("
1440                 + deleteFamilyBloomFilter.getClass().getSimpleName()
1441                 + ") metadata for " + reader.getName());
1442           }
1443         } else {
1444           throw new RuntimeException("Block Type: " + blockType.toString()
1445               + "is not supported for Bloom filter");
1446         }
1447       } catch (IOException e) {
1448         LOG.error("Error reading bloom filter meta for " + blockType
1449             + " -- proceeding without", e);
1450         setBloomFilterFaulty(blockType);
1451       } catch (IllegalArgumentException e) {
1452         LOG.error("Bad bloom filter meta " + blockType
1453             + " -- proceeding without", e);
1454         setBloomFilterFaulty(blockType);
1455       }
1456     }
1457 
1458     private void setBloomFilterFaulty(BlockType blockType) {
1459       if (blockType == BlockType.GENERAL_BLOOM_META) {
1460         setGeneralBloomFilterFaulty();
1461       } else if (blockType == BlockType.DELETE_FAMILY_BLOOM_META) {
1462         setDeleteFamilyBloomFilterFaulty();
1463       }
1464     }
1465 
1466     /**
1467      * The number of Bloom filter entries in this store file, or an estimate
1468      * thereof, if the Bloom filter is not loaded. This always returns an upper
1469      * bound of the number of Bloom filter entries.
1470      *
1471      * @return an estimate of the number of Bloom filter entries in this file
1472      */
1473     public long getFilterEntries() {
1474       return generalBloomFilter != null ? generalBloomFilter.getKeyCount()
1475           : reader.getEntries();
1476     }
1477 
1478     public void setGeneralBloomFilterFaulty() {
1479       generalBloomFilter = null;
1480     }
1481 
1482     public void setDeleteFamilyBloomFilterFaulty() {
1483       this.deleteFamilyBloomFilter = null;
1484     }
1485 
1486     public byte[] getLastKey() {
1487       return reader.getLastKey();
1488     }
1489 
1490     public byte[] getLastRowKey() {
1491       return reader.getLastRowKey();
1492     }
1493 
1494     public Cell midkey() throws IOException {
1495       return reader.midkey();
1496     }
1497 
1498     public long length() {
1499       return reader.length();
1500     }
1501 
1502     public long getTotalUncompressedBytes() {
1503       return reader.getTrailer().getTotalUncompressedBytes();
1504     }
1505 
1506     public long getEntries() {
1507       return reader.getEntries();
1508     }
1509 
1510     public long getDeleteFamilyCnt() {
1511       return deleteFamilyCnt;
1512     }
1513 
1514     public Cell getFirstKey() {
1515       return reader.getFirstKey();
1516     }
1517 
1518     public long indexSize() {
1519       return reader.indexSize();
1520     }
1521 
1522     public BloomType getBloomFilterType() {
1523       return this.bloomFilterType;
1524     }
1525 
1526     public long getSequenceID() {
1527       return sequenceID;
1528     }
1529 
1530     public void setSequenceID(long sequenceID) {
1531       this.sequenceID = sequenceID;
1532     }
1533 
1534     public void setBulkLoaded(boolean bulkLoadResult) {
1535       this.bulkLoadResult = bulkLoadResult;
1536     }
1537 
1538     public boolean isBulkLoaded() {
1539       return this.bulkLoadResult;
1540     }
1541 
1542     BloomFilter getGeneralBloomFilter() {
1543       return generalBloomFilter;
1544     }
1545 
1546     long getUncompressedDataIndexSize() {
1547       return reader.getTrailer().getUncompressedDataIndexSize();
1548     }
1549 
1550     public long getTotalBloomSize() {
1551       if (generalBloomFilter == null)
1552         return 0;
1553       return generalBloomFilter.getByteSize();
1554     }
1555 
1556     public int getHFileVersion() {
1557       return reader.getTrailer().getMajorVersion();
1558     }
1559 
1560     public int getHFileMinorVersion() {
1561       return reader.getTrailer().getMinorVersion();
1562     }
1563 
1564     public HFile.Reader getHFileReader() {
1565       return reader;
1566     }
1567 
1568     void disableBloomFilterForTesting() {
1569       generalBloomFilter = null;
1570       this.deleteFamilyBloomFilter = null;
1571     }
1572 
1573     public long getMaxTimestamp() {
1574       return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp();
1575     }
1576   }
1577 
1578   /**
1579    * Useful comparators for comparing StoreFiles.
1580    */
1581   public abstract static class Comparators {
1582     /**
1583      * Comparator that compares based on the Sequence Ids of the
1584      * the StoreFiles. Bulk loads that did not request a seq ID
1585      * are given a seq id of -1; thus, they are placed before all non-
1586      * bulk loads, and bulk loads with sequence Id. Among these files,
1587      * the size is used to determine the ordering, then bulkLoadTime.
1588      * If there are ties, the path name is used as a tie-breaker.
1589      */
1590     public static final Comparator<StoreFile> SEQ_ID =
1591       Ordering.compound(ImmutableList.of(
1592           Ordering.natural().onResultOf(new GetSeqId()),
1593           Ordering.natural().onResultOf(new GetFileSize()).reverse(),
1594           Ordering.natural().onResultOf(new GetBulkTime()),
1595           Ordering.natural().onResultOf(new GetPathName())
1596       ));
1597 
1598     private static class GetSeqId implements Function<StoreFile, Long> {
1599       @Override
1600       public Long apply(StoreFile sf) {
1601         return sf.getMaxSequenceId();
1602       }
1603     }
1604 
1605     private static class GetFileSize implements Function<StoreFile, Long> {
1606       @Override
1607       public Long apply(StoreFile sf) {
1608         return sf.getReader().length();
1609       }
1610     }
1611 
1612     private static class GetBulkTime implements Function<StoreFile, Long> {
1613       @Override
1614       public Long apply(StoreFile sf) {
1615         if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
1616         return sf.getBulkLoadTimestamp();
1617       }
1618     }
1619 
1620     private static class GetPathName implements Function<StoreFile, String> {
1621       @Override
1622       public String apply(StoreFile sf) {
1623         return sf.getPath().getName();
1624       }
1625     }
1626   }
1627 }