1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.Closeable;
23  import java.io.DataInput;
24  import java.io.DataInputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.io.SequenceInputStream;
28  import java.nio.ByteBuffer;
29  import java.util.ArrayList;
30  import java.util.Collection;
31  import java.util.Comparator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.SortedMap;
36  import java.util.TreeMap;
37  import java.util.concurrent.ArrayBlockingQueue;
38  import java.util.concurrent.BlockingQueue;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.classification.InterfaceAudience;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.fs.FSDataInputStream;
47  import org.apache.hadoop.fs.FSDataOutputStream;
48  import org.apache.hadoop.fs.FileStatus;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.fs.PathFilter;
52  import org.apache.hadoop.hbase.HColumnDescriptor;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.KeyValue;
55  import org.apache.hadoop.hbase.KeyValue.KeyComparator;
56  import org.apache.hadoop.hbase.exceptions.CorruptHFileException;
57  import org.apache.hadoop.hbase.fs.HFileSystem;
58  import org.apache.hadoop.hbase.io.compress.Compression;
59  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
60  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
62  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
63  import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
64  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
65  import org.apache.hadoop.hbase.regionserver.StoreFile.WriterBuilder;
66  import org.apache.hadoop.hbase.util.BloomFilterWriter;
67  import org.apache.hadoop.hbase.util.Bytes;
68  import org.apache.hadoop.hbase.util.ChecksumType;
69  import org.apache.hadoop.hbase.util.FSUtils;
70  import org.apache.hadoop.io.RawComparator;
71  import org.apache.hadoop.io.Writable;
72  
73  import com.google.common.base.Preconditions;
74  import com.google.common.collect.Lists;
75  import com.google.protobuf.ByteString;
76  
77  /**
78   * File format for hbase.
79   * A file of sorted key/value pairs. Both keys and values are byte arrays.
80   * <p>
81   * The memory footprint of a HFile includes the following (below is taken from the
82   * <a
83   * href=https://issues.apache.org/jira/browse/HADOOP-3315>TFile</a> documentation
84   * but applies also to HFile):
85   * <ul>
86   * <li>Some constant overhead of reading or writing a compressed block.
87   * <ul>
88   * <li>Each compressed block requires one compression/decompression codec for
89   * I/O.
90   * <li>Temporary space to buffer the key.
91   * <li>Temporary space to buffer the value.
92   * </ul>
93   * <li>HFile index, which is proportional to the total number of Data Blocks.
94   * The total amount of memory needed to hold the index can be estimated as
95   * (56+AvgKeySize)*NumBlocks.
96   * </ul>
97   * Suggestions on performance optimization.
98   * <ul>
99   * <li>Minimum block size. We recommend a setting of minimum block size between
100  * 8KB to 1MB for general usage. Larger block size is preferred if files are
101  * primarily for sequential access. However, it would lead to inefficient random
102  * access (because there are more data to decompress). Smaller blocks are good
103  * for random access, but require more memory to hold the block index, and may
104  * be slower to create (because we must flush the compressor stream at the
105  * conclusion of each data block, which leads to an FS I/O flush). Further, due
106  * to the internal caching in Compression codec, the smallest possible block
107  * size would be around 20KB-30KB.
108  * <li>The current implementation does not offer true multi-threading for
109  * reading. The implementation uses FSDataInputStream seek()+read(), which is
110  * shown to be much faster than positioned-read call in single thread mode.
111  * However, it also means that if multiple threads attempt to access the same
112  * HFile (using multiple scanners) simultaneously, the actual I/O is carried out
113  * sequentially even if they access different DFS blocks (Reexamine! pread seems
114  * to be 10% faster than seek+read in my testing -- stack).
115  * <li>Compression codec. Use "none" if the data is not very compressable (by
116  * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo"
117  * as the starting point for experimenting. "gz" overs slightly better
118  * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
119  * decompress, comparing to "lzo".
120  * </ul>
121  *
122  * For more on the background behind HFile, see <a
123  * href=https://issues.apache.org/jira/browse/HBASE-61>HBASE-61</a>.
124  * <p>
125  * File is made of data blocks followed by meta data blocks (if any), a fileinfo
126  * block, data block index, meta data block index, and a fixed size trailer
127  * which records the offsets at which file changes content type.
128  * <pre>&lt;data blocks>&lt;meta blocks>&lt;fileinfo>&lt;data index>&lt;meta index>&lt;trailer></pre>
129  * Each block has a bit of magic at its start.  Block are comprised of
130  * key/values.  In data blocks, they are both byte arrays.  Metadata blocks are
131  * a String key and a byte array value.  An empty file looks like this:
132  * <pre>&lt;fileinfo>&lt;trailer></pre>.  That is, there are not data nor meta
133  * blocks present.
134  * <p>
135  * TODO: Do scanners need to be able to take a start and end row?
136  * TODO: Should BlockIndex know the name of its file?  Should it have a Path
137  * that points at its file say for the case where an index lives apart from
138  * an HFile instance?
139  */
140 @InterfaceAudience.Private
141 public class HFile {
142   static final Log LOG = LogFactory.getLog(HFile.class);
143 
144   /**
145    * Maximum length of key in HFile.
146    */
147   public final static int MAXIMUM_KEY_LENGTH = Integer.MAX_VALUE;
148 
149   /**
150    * Default compression: none.
151    */
152   public final static Compression.Algorithm DEFAULT_COMPRESSION_ALGORITHM =
153     Compression.Algorithm.NONE;
154 
155   /** Minimum supported HFile format version */
156   public static final int MIN_FORMAT_VERSION = 1;
157 
158   /** Maximum supported HFile format version
159    */
160   public static final int MAX_FORMAT_VERSION = 2;
161 
162   /** Default compression name: none. */
163   public final static String DEFAULT_COMPRESSION =
164     DEFAULT_COMPRESSION_ALGORITHM.getName();
165 
166   /** Meta data block name for bloom filter bits. */
167   public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
168 
169   /**
170    * We assume that HFile path ends with
171    * ROOT_DIR/TABLE_NAME/REGION_NAME/CF_NAME/HFILE, so it has at least this
172    * many levels of nesting. This is needed for identifying table and CF name
173    * from an HFile path.
174    */
175   public final static int MIN_NUM_HFILE_PATH_LEVELS = 5;
176 
177   /**
178    * The number of bytes per checksum.
179    */
180   public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
181   public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32;
182 
183   // For measuring latency of "sequential" reads and writes
184   private static final AtomicInteger readOps = new AtomicInteger();
185   private static final AtomicLong readTimeNano = new AtomicLong();
186   private static final AtomicInteger writeOps = new AtomicInteger();
187   private static final AtomicLong writeTimeNano = new AtomicLong();
188 
189   // For measuring latency of pread
190   private static final AtomicInteger preadOps = new AtomicInteger();
191   private static final AtomicLong preadTimeNano = new AtomicLong();
192 
193   // For measuring number of checksum failures
194   static final AtomicLong checksumFailures = new AtomicLong();
195 
196   // For getting more detailed stats on FS latencies
197   // If, for some reason, the metrics subsystem stops polling for latencies, 
198   // I don't want data to pile up in a memory leak
199   // so, after LATENCY_BUFFER_SIZE items have been enqueued for processing,
200   // fs latency stats will be dropped (and this behavior will be logged)
201   private static final int LATENCY_BUFFER_SIZE = 5000;
202   private static final BlockingQueue<Long> fsReadLatenciesNanos = 
203       new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
204   private static final BlockingQueue<Long> fsWriteLatenciesNanos = 
205       new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
206   private static final BlockingQueue<Long> fsPreadLatenciesNanos = 
207       new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
208   
209   public static final void offerReadLatency(long latencyNanos, boolean pread) {
210     if (pread) {
211       fsPreadLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full
212       preadOps.incrementAndGet();
213       preadTimeNano.addAndGet(latencyNanos);
214     } else {
215       fsReadLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full
216       readTimeNano.addAndGet(latencyNanos);
217       readOps.incrementAndGet();
218     }
219   }
220   
221   public static final void offerWriteLatency(long latencyNanos) {
222     fsWriteLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full
223     
224     writeTimeNano.addAndGet(latencyNanos);
225     writeOps.incrementAndGet();
226   }
227   
228   public static final Collection<Long> getReadLatenciesNanos() {
229     final List<Long> latencies = 
230         Lists.newArrayListWithCapacity(fsReadLatenciesNanos.size());
231     fsReadLatenciesNanos.drainTo(latencies);
232     return latencies;
233   }
234 
235   public static final Collection<Long> getPreadLatenciesNanos() {
236     final List<Long> latencies = 
237         Lists.newArrayListWithCapacity(fsPreadLatenciesNanos.size());
238     fsPreadLatenciesNanos.drainTo(latencies);
239     return latencies;
240   }
241   
242   public static final Collection<Long> getWriteLatenciesNanos() {
243     final List<Long> latencies = 
244         Lists.newArrayListWithCapacity(fsWriteLatenciesNanos.size());
245     fsWriteLatenciesNanos.drainTo(latencies);
246     return latencies;
247   }
248 
249   // for test purpose
250   public static volatile AtomicLong dataBlockReadCnt = new AtomicLong(0);
251 
252   // number of sequential reads
253   public static final int getReadOps() {
254     return readOps.getAndSet(0);
255   }
256 
257   public static final long getReadTimeMs() {
258     return readTimeNano.getAndSet(0) / 1000000;
259   }
260 
261   // number of positional reads
262   public static final int getPreadOps() {
263     return preadOps.getAndSet(0);
264   }
265 
266   public static final long getPreadTimeMs() {
267     return preadTimeNano.getAndSet(0) / 1000000;
268   }
269 
270   public static final int getWriteOps() {
271     return writeOps.getAndSet(0);
272   }
273 
274   public static final long getWriteTimeMs() {
275     return writeTimeNano.getAndSet(0) / 1000000;
276   }
277 
278   /**
279    * Number of checksum verification failures. It also
280    * clears the counter.
281    */
282   public static final long getChecksumFailuresCount() {
283     return checksumFailures.getAndSet(0);
284   }
285 
286   /** API required to write an {@link HFile} */
287   public interface Writer extends Closeable {
288 
289     /** Add an element to the file info map. */
290     void appendFileInfo(byte[] key, byte[] value) throws IOException;
291 
292     void append(KeyValue kv) throws IOException;
293 
294     void append(byte[] key, byte[] value) throws IOException;
295 
296     /** @return the path to this {@link HFile} */
297     Path getPath();
298 
299     /**
300      * Adds an inline block writer such as a multi-level block index writer or
301      * a compound Bloom filter writer.
302      */
303     void addInlineBlockWriter(InlineBlockWriter bloomWriter);
304 
305     // The below three methods take Writables.  We'd like to undo Writables but undoing the below would be pretty
306     // painful.  Could take a byte [] or a Message but we want to be backward compatible around hfiles so would need
307     // to map between Message and Writable or byte [] and current Writable serialization.  This would be a bit of work
308     // to little gain.  Thats my thinking at moment.  St.Ack 20121129
309 
310     void appendMetaBlock(String bloomFilterMetaKey, Writable metaWriter);
311 
312     /**
313      * Store general Bloom filter in the file. This does not deal with Bloom filter
314      * internals but is necessary, since Bloom filters are stored differently
315      * in HFile version 1 and version 2.
316      */
317     void addGeneralBloomFilter(BloomFilterWriter bfw);
318 
319     /**
320      * Store delete family Bloom filter in the file, which is only supported in
321      * HFile V2.
322      */
323     void addDeleteFamilyBloomFilter(BloomFilterWriter bfw) throws IOException;
324   }
325 
326   /**
327    * This variety of ways to construct writers is used throughout the code, and
328    * we want to be able to swap writer implementations.
329    */
330   public static abstract class WriterFactory {
331     protected final Configuration conf;
332     protected final CacheConfig cacheConf;
333     protected FileSystem fs;
334     protected Path path;
335     protected FSDataOutputStream ostream;
336     protected int blockSize = HColumnDescriptor.DEFAULT_BLOCKSIZE;
337     protected Compression.Algorithm compression =
338         HFile.DEFAULT_COMPRESSION_ALGORITHM;
339     protected HFileDataBlockEncoder encoder = NoOpDataBlockEncoder.INSTANCE;
340     protected KeyComparator comparator;
341     protected ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
342     protected int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM;
343     protected boolean includeMVCCReadpoint = true;
344 
345     WriterFactory(Configuration conf, CacheConfig cacheConf) {
346       this.conf = conf;
347       this.cacheConf = cacheConf;
348     }
349 
350     public WriterFactory withPath(FileSystem fs, Path path) {
351       Preconditions.checkNotNull(fs);
352       Preconditions.checkNotNull(path);
353       this.fs = fs;
354       this.path = path;
355       return this;
356     }
357 
358     public WriterFactory withOutputStream(FSDataOutputStream ostream) {
359       Preconditions.checkNotNull(ostream);
360       this.ostream = ostream;
361       return this;
362     }
363 
364     public WriterFactory withBlockSize(int blockSize) {
365       this.blockSize = blockSize;
366       return this;
367     }
368 
369     public WriterFactory withCompression(Compression.Algorithm compression) {
370       Preconditions.checkNotNull(compression);
371       this.compression = compression;
372       return this;
373     }
374 
375     public WriterFactory withCompression(String compressAlgo) {
376       Preconditions.checkNotNull(compression);
377       this.compression = AbstractHFileWriter.compressionByName(compressAlgo);
378       return this;
379     }
380 
381     public WriterFactory withDataBlockEncoder(HFileDataBlockEncoder encoder) {
382       Preconditions.checkNotNull(encoder);
383       this.encoder = encoder;
384       return this;
385     }
386 
387     public WriterFactory withComparator(KeyComparator comparator) {
388       Preconditions.checkNotNull(comparator);
389       this.comparator = comparator;
390       return this;
391     }
392 
393     public WriterFactory withChecksumType(ChecksumType checksumType) {
394       Preconditions.checkNotNull(checksumType);
395       this.checksumType = checksumType;
396       return this;
397     }
398 
399     public WriterFactory withBytesPerChecksum(int bytesPerChecksum) {
400       this.bytesPerChecksum = bytesPerChecksum;
401       return this;
402     }
403 
404     /**
405      * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV
406      * @return this (for chained invocation)
407      */
408     public WriterFactory includeMVCCReadpoint(boolean includeMVCCReadpoint) {
409       this.includeMVCCReadpoint = includeMVCCReadpoint;
410       return this;
411     }
412 
413     public Writer create() throws IOException {
414       if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
415         throw new AssertionError("Please specify exactly one of " +
416             "filesystem/path or path");
417       }
418       if (path != null) {
419         ostream = AbstractHFileWriter.createOutputStream(conf, fs, path);
420       }
421       return createWriter(fs, path, ostream, blockSize,
422           compression, encoder, comparator, checksumType, bytesPerChecksum, includeMVCCReadpoint);
423     }
424 
425     protected abstract Writer createWriter(FileSystem fs, Path path,
426         FSDataOutputStream ostream, int blockSize,
427         Compression.Algorithm compress,
428         HFileDataBlockEncoder dataBlockEncoder,
429         KeyComparator comparator, ChecksumType checksumType,
430         int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException;
431   }
432 
433   /** The configuration key for HFile version to use for new files */
434   public static final String FORMAT_VERSION_KEY = "hfile.format.version";
435 
436   public static int getFormatVersion(Configuration conf) {
437     int version = conf.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
438     checkFormatVersion(version);
439     return version;
440   }
441 
442   /**
443    * Returns the factory to be used to create {@link HFile} writers.
444    * Disables block cache access for all writers created through the
445    * returned factory.
446    */
447   public static final WriterFactory getWriterFactoryNoCache(Configuration
448        conf) {
449     Configuration tempConf = new Configuration(conf);
450     tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
451     return HFile.getWriterFactory(conf, new CacheConfig(tempConf));
452   }
453 
454   /**
455    * Returns the factory to be used to create {@link HFile} writers
456    */
457   public static final WriterFactory getWriterFactory(Configuration conf,
458       CacheConfig cacheConf) {
459     int version = getFormatVersion(conf);
460     switch (version) {
461     case 2:
462       return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
463     default:
464       throw new IllegalArgumentException("Cannot create writer for HFile " +
465           "format version " + version);
466     }
467   }
468 
469   /** An abstraction used by the block index */
470   public interface CachingBlockReader {
471     HFileBlock readBlock(long offset, long onDiskBlockSize,
472         boolean cacheBlock, final boolean pread, final boolean isCompaction,
473         BlockType expectedBlockType)
474         throws IOException;
475   }
476 
477   /** An interface used by clients to open and iterate an {@link HFile}. */
478   public interface Reader extends Closeable, CachingBlockReader {
479     /**
480      * Returns this reader's "name". Usually the last component of the path.
481      * Needs to be constant as the file is being moved to support caching on
482      * write.
483      */
484     String getName();
485 
486     RawComparator<byte []> getComparator();
487 
488     HFileScanner getScanner(boolean cacheBlocks,
489        final boolean pread, final boolean isCompaction);
490 
491     ByteBuffer getMetaBlock(String metaBlockName,
492        boolean cacheBlock) throws IOException;
493 
494     Map<byte[], byte[]> loadFileInfo() throws IOException;
495 
496     byte[] getLastKey();
497 
498     byte[] midkey() throws IOException;
499 
500     long length();
501 
502     long getEntries();
503 
504     byte[] getFirstKey();
505 
506     long indexSize();
507 
508     byte[] getFirstRowKey();
509 
510     byte[] getLastRowKey();
511 
512     FixedFileTrailer getTrailer();
513 
514     HFileBlockIndex.BlockIndexReader getDataBlockIndexReader();
515 
516     HFileScanner getScanner(boolean cacheBlocks, boolean pread);
517 
518     Compression.Algorithm getCompressionAlgorithm();
519 
520     /**
521      * Retrieves general Bloom filter metadata as appropriate for each
522      * {@link HFile} version.
523      * Knows nothing about how that metadata is structured.
524      */
525     DataInput getGeneralBloomFilterMetadata() throws IOException;
526 
527     /**
528      * Retrieves delete family Bloom filter metadata as appropriate for each
529      * {@link HFile}  version.
530      * Knows nothing about how that metadata is structured.
531      */
532     DataInput getDeleteBloomFilterMetadata() throws IOException;
533 
534     Path getPath();
535 
536     /** Close method with optional evictOnClose */
537     void close(boolean evictOnClose) throws IOException;
538 
539     DataBlockEncoding getEncodingOnDisk();
540   }
541 
542   /**
543    * Method returns the reader given the specified arguments.
544    * TODO This is a bad abstraction.  See HBASE-6635.
545    *
546    * @param path hfile's path
547    * @param fsdis stream of path's file
548    * @param size max size of the trailer.
549    * @param cacheConf Cache configuation values, cannot be null.
550    * @param preferredEncodingInCache
551    * @param hfs
552    * @return an appropriate instance of HFileReader
553    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
554    */
555   private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
556       long size, CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache,
557       HFileSystem hfs) throws IOException {
558     FixedFileTrailer trailer = null;
559     try {
560       boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
561       assert !isHBaseChecksum; // Initially we must read with FS checksum.
562       trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
563     } catch (IllegalArgumentException iae) {
564       throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, iae);
565     }
566     switch (trailer.getMajorVersion()) {
567     case 2:
568       return new HFileReaderV2(
569           path, trailer, fsdis, size, cacheConf, preferredEncodingInCache, hfs);
570     default:
571       throw new CorruptHFileException("Invalid HFile version " + trailer.getMajorVersion());
572     }
573   }
574 
575   /**
576    * @param fs A file system
577    * @param path Path to HFile
578    * @param cacheConf Cache configuration for hfile's contents
579    * @param preferredEncodingInCache Preferred in-cache data encoding algorithm.
580    * @return A version specific Hfile Reader
581    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
582    */
583   public static Reader createReaderWithEncoding(
584       FileSystem fs, Path path, CacheConfig cacheConf,
585       DataBlockEncoding preferredEncodingInCache) throws IOException {
586     final boolean closeIStream = true;
587     FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
588     return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
589         cacheConf, preferredEncodingInCache, stream.getHfs());
590   }
591 
592   /**
593    * @param fs A file system
594    * @param path Path to HFile
595    * @param fsdis a stream of path's file
596    * @param size max size of the trailer.
597    * @param cacheConf Cache configuration for hfile's contents
598    * @param preferredEncodingInCache Preferred in-cache data encoding algorithm.
599    * @return A version specific Hfile Reader
600    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
601    */
602   public static Reader createReaderWithEncoding(FileSystem fs, Path path,
603       FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf,
604       DataBlockEncoding preferredEncodingInCache) throws IOException {
605     HFileSystem hfs = null;
606 
607     // If the fs is not an instance of HFileSystem, then create an
608     // instance of HFileSystem that wraps over the specified fs.
609     // In this case, we will not be able to avoid checksumming inside
610     // the filesystem.
611     if (!(fs instanceof HFileSystem)) {
612       hfs = new HFileSystem(fs);
613     } else {
614       hfs = (HFileSystem)fs;
615     }
616     return pickReaderVersion(path, fsdis, size, cacheConf, preferredEncodingInCache, hfs);
617   }
618 
619   /**
620    *
621    * @param fs filesystem
622    * @param path Path to file to read
623    * @param cacheConf This must not be null.  @see {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
624    * @return an active Reader instance
625    * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
626    */
627   public static Reader createReader(
628       FileSystem fs, Path path, CacheConfig cacheConf) throws IOException {
629     Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
630     return createReaderWithEncoding(fs, path, cacheConf,
631         DataBlockEncoding.NONE);
632   }
633 
634   /**
635    * This factory method is used only by unit tests
636    */
637   static Reader createReaderFromStream(Path path,
638       FSDataInputStream fsdis, long size, CacheConfig cacheConf)
639       throws IOException {
640     FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
641     return pickReaderVersion(path, wrapper, size, cacheConf, DataBlockEncoding.NONE, null);
642   }
643 
644   /**
645    * Metadata for this file.  Conjured by the writer.  Read in by the reader.
646    */
647   static class FileInfo implements SortedMap<byte [], byte []> {
648     static final String RESERVED_PREFIX = "hfile.";
649     static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX);
650     static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY");
651     static final byte [] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
652     static final byte [] AVG_VALUE_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN");
653     static final byte [] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
654     private final SortedMap<byte [], byte []> map = new TreeMap<byte [], byte []>(Bytes.BYTES_COMPARATOR);
655 
656     public FileInfo() {
657       super();
658     }
659 
660     /**
661      * Append the given key/value pair to the file info, optionally checking the
662      * key prefix.
663      *
664      * @param k key to add
665      * @param v value to add
666      * @param checkPrefix whether to check that the provided key does not start
667      *          with the reserved prefix
668      * @return this file info object
669      * @throws IOException if the key or value is invalid
670      */
671     public FileInfo append(final byte[] k, final byte[] v,
672         final boolean checkPrefix) throws IOException {
673       if (k == null || v == null) {
674         throw new NullPointerException("Key nor value may be null");
675       }
676       if (checkPrefix && isReservedFileInfoKey(k)) {
677         throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX
678             + " are reserved");
679       }
680       put(k, v);
681       return this;
682     }
683 
684     public void clear() {
685       this.map.clear();
686     }
687 
688     public Comparator<? super byte[]> comparator() {
689       return map.comparator();
690     }
691 
692     public boolean containsKey(Object key) {
693       return map.containsKey(key);
694     }
695 
696     public boolean containsValue(Object value) {
697       return map.containsValue(value);
698     }
699 
700     public Set<java.util.Map.Entry<byte[], byte[]>> entrySet() {
701       return map.entrySet();
702     }
703 
704     public boolean equals(Object o) {
705       return map.equals(o);
706     }
707 
708     public byte[] firstKey() {
709       return map.firstKey();
710     }
711 
712     public byte[] get(Object key) {
713       return map.get(key);
714     }
715 
716     public int hashCode() {
717       return map.hashCode();
718     }
719 
720     public SortedMap<byte[], byte[]> headMap(byte[] toKey) {
721       return this.map.headMap(toKey);
722     }
723 
724     public boolean isEmpty() {
725       return map.isEmpty();
726     }
727 
728     public Set<byte[]> keySet() {
729       return map.keySet();
730     }
731 
732     public byte[] lastKey() {
733       return map.lastKey();
734     }
735 
736     public byte[] put(byte[] key, byte[] value) {
737       return this.map.put(key, value);
738     }
739 
740     public void putAll(Map<? extends byte[], ? extends byte[]> m) {
741       this.map.putAll(m);
742     }
743 
744     public byte[] remove(Object key) {
745       return this.map.remove(key);
746     }
747 
748     public int size() {
749       return map.size();
750     }
751 
752     public SortedMap<byte[], byte[]> subMap(byte[] fromKey, byte[] toKey) {
753       return this.map.subMap(fromKey, toKey);
754     }
755 
756     public SortedMap<byte[], byte[]> tailMap(byte[] fromKey) {
757       return this.map.tailMap(fromKey);
758     }
759 
760     public Collection<byte[]> values() {
761       return map.values();
762     }
763 
764     /**
765      * Write out this instance on the passed in <code>out</code> stream.
766      * We write it as a protobuf.
767      * @param out
768      * @throws IOException
769      * @see {@link #read(DataInputStream)}
770      */
771     void write(final DataOutputStream out) throws IOException {
772       HFileProtos.FileInfoProto.Builder builder = HFileProtos.FileInfoProto.newBuilder();
773       for (Map.Entry<byte [], byte[]> e: this.map.entrySet()) {
774         HBaseProtos.BytesBytesPair.Builder bbpBuilder = HBaseProtos.BytesBytesPair.newBuilder();
775         bbpBuilder.setFirst(ByteString.copyFrom(e.getKey()));
776         bbpBuilder.setSecond(ByteString.copyFrom(e.getValue()));
777         builder.addMapEntry(bbpBuilder.build());
778       }
779       out.write(ProtobufUtil.PB_MAGIC);
780       builder.build().writeDelimitedTo(out);
781     }
782 
783     /**
784      * Populate this instance with what we find on the passed in <code>in</code> stream.
785      * Can deserialize protobuf of old Writables format.
786      * @param in
787      * @throws IOException
788      * @see {@link #write(DataOutputStream)}
789      */
790     void read(final DataInputStream in) throws IOException {
791       // This code is tested over in TestHFileReaderV1 where we read an old hfile w/ this new code.
792       int pblen = ProtobufUtil.lengthOfPBMagic();
793       byte [] pbuf = new byte[pblen];
794       if (in.markSupported()) in.mark(pblen);
795       int read = in.read(pbuf);
796       if (read != pblen) throw new IOException("read=" + read + ", wanted=" + pblen);
797       if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
798         parsePB(HFileProtos.FileInfoProto.parseDelimitedFrom(in));
799       } else {
800         if (in.markSupported()) {
801           in.reset();
802           parseWritable(in);
803         } else {
804           // We cannot use BufferedInputStream, it consumes more than we read from the underlying IS
805           ByteArrayInputStream bais = new ByteArrayInputStream(pbuf);
806           SequenceInputStream sis = new SequenceInputStream(bais, in); // Concatenate input streams
807           // TODO: Am I leaking anything here wrapping the passed in stream?  We are not calling close on the wrapped
808           // streams but they should be let go after we leave this context?  I see that we keep a reference to the
809           // passed in inputstream but since we no longer have a reference to this after we leave, we should be ok.
810           parseWritable(new DataInputStream(sis));
811         }
812       }
813     }
814 
815     /** Now parse the old Writable format.  It was a list of Map entries.  Each map entry was a key and a value of
816      * a byte [].  The old map format had a byte before each entry that held a code which was short for the key or
817      * value type.  We know it was a byte [] so in below we just read and dump it.
818      * @throws IOException 
819      */
820     void parseWritable(final DataInputStream in) throws IOException {
821       // First clear the map.  Otherwise we will just accumulate entries every time this method is called.
822       this.map.clear();
823       // Read the number of entries in the map
824       int entries = in.readInt();
825       // Then read each key/value pair
826       for (int i = 0; i < entries; i++) {
827         byte [] key = Bytes.readByteArray(in);
828         // We used to read a byte that encoded the class type.  Read and ignore it because it is always byte [] in hfile
829         in.readByte();
830         byte [] value = Bytes.readByteArray(in);
831         this.map.put(key, value);
832       }
833     }
834 
835     /**
836      * Fill our map with content of the pb we read off disk
837      * @param fip protobuf message to read
838      */
839     void parsePB(final HFileProtos.FileInfoProto fip) {
840       this.map.clear();
841       for (BytesBytesPair pair: fip.getMapEntryList()) {
842         this.map.put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
843       }
844     }
845   }
846 
847   /** Return true if the given file info key is reserved for internal use. */
848   public static boolean isReservedFileInfoKey(byte[] key) {
849     return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
850   }
851 
852   /**
853    * Get names of supported compression algorithms. The names are acceptable by
854    * HFile.Writer.
855    *
856    * @return Array of strings, each represents a supported compression
857    *         algorithm. Currently, the following compression algorithms are
858    *         supported.
859    *         <ul>
860    *         <li>"none" - No compression.
861    *         <li>"gz" - GZIP compression.
862    *         </ul>
863    */
864   public static String[] getSupportedCompressionAlgorithms() {
865     return Compression.getSupportedAlgorithms();
866   }
867 
868   // Utility methods.
869   /*
870    * @param l Long to convert to an int.
871    * @return <code>l</code> cast as an int.
872    */
873   static int longToInt(final long l) {
874     // Expecting the size() of a block not exceeding 4GB. Assuming the
875     // size() will wrap to negative integer if it exceeds 2GB (From tfile).
876     return (int)(l & 0x00000000ffffffffL);
877   }
878 
879   /**
880    * Returns all files belonging to the given region directory. Could return an
881    * empty list.
882    *
883    * @param fs  The file system reference.
884    * @param regionDir  The region directory to scan.
885    * @return The list of files found.
886    * @throws IOException When scanning the files fails.
887    */
888   static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
889       throws IOException {
890     List<Path> res = new ArrayList<Path>();
891     PathFilter dirFilter = new FSUtils.DirFilter(fs);
892     FileStatus[] familyDirs = fs.listStatus(regionDir, dirFilter);
893     for(FileStatus dir : familyDirs) {
894       FileStatus[] files = fs.listStatus(dir.getPath());
895       for (FileStatus file : files) {
896         if (!file.isDir()) {
897           res.add(file.getPath());
898         }
899       }
900     }
901     return res;
902   }
903 
904   public static void main(String[] args) throws IOException {
905     HFilePrettyPrinter prettyPrinter = new HFilePrettyPrinter();
906     System.exit(prettyPrinter.run(args));
907   }
908 
909   /**
910    * Checks the given {@link HFile} format version, and throws an exception if
911    * invalid. Note that if the version number comes from an input file and has
912    * not been verified, the caller needs to re-throw an {@link IOException} to
913    * indicate that this is not a software error, but corrupted input.
914    *
915    * @param version an HFile version
916    * @throws IllegalArgumentException if the version is invalid
917    */
918   public static void checkFormatVersion(int version)
919       throws IllegalArgumentException {
920     if (version < MIN_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
921       throw new IllegalArgumentException("Invalid HFile version: " + version
922           + " (expected to be " + "between " + MIN_FORMAT_VERSION + " and "
923           + MAX_FORMAT_VERSION + ")");
924     }
925   }
926 }