View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.Closeable;
23  import java.io.DataInput;
24  import java.io.DataInputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.io.SequenceInputStream;
28  import java.net.InetSocketAddress;
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Comparator;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.SortedMap;
37  import java.util.TreeMap;
38  import java.util.concurrent.ArrayBlockingQueue;
39  import java.util.concurrent.BlockingQueue;
40  import java.util.concurrent.atomic.AtomicInteger;
41  import java.util.concurrent.atomic.AtomicLong;
42  
43  import com.google.protobuf.HBaseZeroCopyByteString;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.classification.InterfaceAudience;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.fs.FSDataInputStream;
49  import org.apache.hadoop.fs.FSDataOutputStream;
50  import org.apache.hadoop.fs.FileStatus;
51  import org.apache.hadoop.fs.FileSystem;
52  import org.apache.hadoop.fs.Path;
53  import org.apache.hadoop.fs.PathFilter;
54  import org.apache.hadoop.hbase.HConstants;
55  import org.apache.hadoop.hbase.KeyValue;
56  import org.apache.hadoop.hbase.KeyValue.KVComparator;
57  import org.apache.hadoop.hbase.fs.HFileSystem;
58  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
59  import org.apache.hadoop.hbase.io.compress.Compression;
60  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
61  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
63  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
64  import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
65  import org.apache.hadoop.hbase.util.BloomFilterWriter;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.ChecksumType;
68  import org.apache.hadoop.hbase.util.FSUtils;
69  import org.apache.hadoop.io.Writable;
70  
71  import com.google.common.base.Preconditions;
72  import com.google.common.collect.Lists;
73  
74  /**
75   * File format for hbase.
76   * A file of sorted key/value pairs. Both keys and values are byte arrays.
77   * <p>
78   * The memory footprint of a HFile includes the following (below is taken from the
79   * <a
80   * href=https://issues.apache.org/jira/browse/HADOOP-3315>TFile</a> documentation
81   * but applies also to HFile):
82   * <ul>
83   * <li>Some constant overhead of reading or writing a compressed block.
84   * <ul>
85   * <li>Each compressed block requires one compression/decompression codec for
86   * I/O.
87   * <li>Temporary space to buffer the key.
88   * <li>Temporary space to buffer the value.
89   * </ul>
90   * <li>HFile index, which is proportional to the total number of Data Blocks.
91   * The total amount of memory needed to hold the index can be estimated as
92   * (56+AvgKeySize)*NumBlocks.
93   * </ul>
94   * Suggestions on performance optimization.
95   * <ul>
96   * <li>Minimum block size. We recommend a setting of minimum block size between
97   * 8KB to 1MB for general usage. Larger block size is preferred if files are
98   * primarily for sequential access. However, it would lead to inefficient random
99   * access (because there are more data to decompress). Smaller blocks are good
100  * for random access, but require more memory to hold the block index, and may
101  * be slower to create (because we must flush the compressor stream at the
102  * conclusion of each data block, which leads to an FS I/O flush). Further, due
103  * to the internal caching in Compression codec, the smallest possible block
104  * size would be around 20KB-30KB.
105  * <li>The current implementation does not offer true multi-threading for
106  * reading. The implementation uses FSDataInputStream seek()+read(), which is
107  * shown to be much faster than positioned-read call in single thread mode.
108  * However, it also means that if multiple threads attempt to access the same
109  * HFile (using multiple scanners) simultaneously, the actual I/O is carried out
110  * sequentially even if they access different DFS blocks (Reexamine! pread seems
111  * to be 10% faster than seek+read in my testing -- stack).
112  * <li>Compression codec. Use "none" if the data is not very compressable (by
113  * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo"
114  * as the starting point for experimenting. "gz" overs slightly better
115  * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
116  * decompress, comparing to "lzo".
117  * </ul>
118  *
119  * For more on the background behind HFile, see <a
120  * href=https://issues.apache.org/jira/browse/HBASE-61>HBASE-61</a>.
121  * <p>
122  * File is made of data blocks followed by meta data blocks (if any), a fileinfo
123  * block, data block index, meta data block index, and a fixed size trailer
124  * which records the offsets at which file changes content type.
125  * <pre>&lt;data blocks>&lt;meta blocks>&lt;fileinfo>&lt;data index>&lt;meta index>&lt;trailer></pre>
126  * Each block has a bit of magic at its start.  Block are comprised of
127  * key/values.  In data blocks, they are both byte arrays.  Metadata blocks are
128  * a String key and a byte array value.  An empty file looks like this:
129  * <pre>&lt;fileinfo>&lt;trailer></pre>.  That is, there are not data nor meta
130  * blocks present.
131  * <p>
132  * TODO: Do scanners need to be able to take a start and end row?
133  * TODO: Should BlockIndex know the name of its file?  Should it have a Path
134  * that points at its file say for the case where an index lives apart from
135  * an HFile instance?
136  */
137 @InterfaceAudience.Private
138 public class HFile {
139   static final Log LOG = LogFactory.getLog(HFile.class);
140 
141   /**
142    * Maximum length of key in HFile.
143    */
144   public final static int MAXIMUM_KEY_LENGTH = Integer.MAX_VALUE;
145 
146   /**
147    * Default compression: none.
148    */
149   public final static Compression.Algorithm DEFAULT_COMPRESSION_ALGORITHM =
150     Compression.Algorithm.NONE;
151 
152   /** Minimum supported HFile format version */
153   public static final int MIN_FORMAT_VERSION = 2;
154 
155   /** Maximum supported HFile format version
156    */
157   public static final int MAX_FORMAT_VERSION = 3;
158 
159   /**
160    * Minimum HFile format version with support for persisting cell tags
161    */
162   public static final int MIN_FORMAT_VERSION_WITH_TAGS = 3;
163 
164   /** Default compression name: none. */
165   public final static String DEFAULT_COMPRESSION =
166     DEFAULT_COMPRESSION_ALGORITHM.getName();
167 
168   /** Meta data block name for bloom filter bits. */
169   public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
170 
171   /**
172    * We assume that HFile path ends with
173    * ROOT_DIR/TABLE_NAME/REGION_NAME/CF_NAME/HFILE, so it has at least this
174    * many levels of nesting. This is needed for identifying table and CF name
175    * from an HFile path.
176    */
177   public final static int MIN_NUM_HFILE_PATH_LEVELS = 5;
178 
179   /**
180    * The number of bytes per checksum.
181    */
182   public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
183   public static final ChecksumType DEFAULT_CHECKSUM_TYPE = ChecksumType.CRC32;
184 
185   // For measuring latency of "sequential" reads and writes
186   private static final AtomicInteger readOps = new AtomicInteger();
187   private static final AtomicLong readTimeNano = new AtomicLong();
188   private static final AtomicInteger writeOps = new AtomicInteger();
189   private static final AtomicLong writeTimeNano = new AtomicLong();
190 
191   // For measuring latency of pread
192   private static final AtomicInteger preadOps = new AtomicInteger();
193   private static final AtomicLong preadTimeNano = new AtomicLong();
194 
195   // For measuring number of checksum failures
196   static final AtomicLong checksumFailures = new AtomicLong();
197 
198   // For getting more detailed stats on FS latencies
199   // If, for some reason, the metrics subsystem stops polling for latencies,
200   // I don't want data to pile up in a memory leak
201   // so, after LATENCY_BUFFER_SIZE items have been enqueued for processing,
202   // fs latency stats will be dropped (and this behavior will be logged)
203   private static final int LATENCY_BUFFER_SIZE = 5000;
204   private static final BlockingQueue<Long> fsReadLatenciesNanos =
205       new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
206   private static final BlockingQueue<Long> fsWriteLatenciesNanos =
207       new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
208   private static final BlockingQueue<Long> fsPreadLatenciesNanos =
209       new ArrayBlockingQueue<Long>(LATENCY_BUFFER_SIZE);
210 
211   public static final void offerReadLatency(long latencyNanos, boolean pread) {
212     if (pread) {
213       fsPreadLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full
214       preadOps.incrementAndGet();
215       preadTimeNano.addAndGet(latencyNanos);
216     } else {
217       fsReadLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full
218       readTimeNano.addAndGet(latencyNanos);
219       readOps.incrementAndGet();
220     }
221   }
222 
223   public static final void offerWriteLatency(long latencyNanos) {
224     fsWriteLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full
225 
226     writeTimeNano.addAndGet(latencyNanos);
227     writeOps.incrementAndGet();
228   }
229 
230   public static final Collection<Long> getReadLatenciesNanos() {
231     final List<Long> latencies =
232         Lists.newArrayListWithCapacity(fsReadLatenciesNanos.size());
233     fsReadLatenciesNanos.drainTo(latencies);
234     return latencies;
235   }
236 
237   public static final Collection<Long> getPreadLatenciesNanos() {
238     final List<Long> latencies =
239         Lists.newArrayListWithCapacity(fsPreadLatenciesNanos.size());
240     fsPreadLatenciesNanos.drainTo(latencies);
241     return latencies;
242   }
243 
244   public static final Collection<Long> getWriteLatenciesNanos() {
245     final List<Long> latencies =
246         Lists.newArrayListWithCapacity(fsWriteLatenciesNanos.size());
247     fsWriteLatenciesNanos.drainTo(latencies);
248     return latencies;
249   }
250 
251   // for test purpose
252   public static final AtomicLong dataBlockReadCnt = new AtomicLong(0);
253 
254   // number of sequential reads
255   public static final int getReadOps() {
256     return readOps.getAndSet(0);
257   }
258 
259   public static final long getReadTimeMs() {
260     return readTimeNano.getAndSet(0) / 1000000;
261   }
262 
263   // number of positional reads
264   public static final int getPreadOps() {
265     return preadOps.getAndSet(0);
266   }
267 
268   public static final long getPreadTimeMs() {
269     return preadTimeNano.getAndSet(0) / 1000000;
270   }
271 
272   public static final int getWriteOps() {
273     return writeOps.getAndSet(0);
274   }
275 
276   public static final long getWriteTimeMs() {
277     return writeTimeNano.getAndSet(0) / 1000000;
278   }
279 
280   /**
281    * Number of checksum verification failures. It also
282    * clears the counter.
283    */
284   public static final long getChecksumFailuresCount() {
285     return checksumFailures.getAndSet(0);
286   }
287 
288   /** API required to write an {@link HFile} */
289   public interface Writer extends Closeable {
290 
291     /** Add an element to the file info map. */
292     void appendFileInfo(byte[] key, byte[] value) throws IOException;
293 
294     void append(KeyValue kv) throws IOException;
295 
296     void append(byte[] key, byte[] value) throws IOException;
297 
298     void append (byte[] key, byte[] value, byte[] tag) throws IOException;
299 
300     /** @return the path to this {@link HFile} */
301     Path getPath();
302 
303     /**
304      * Adds an inline block writer such as a multi-level block index writer or
305      * a compound Bloom filter writer.
306      */
307     void addInlineBlockWriter(InlineBlockWriter bloomWriter);
308 
309     // The below three methods take Writables.  We'd like to undo Writables but undoing the below would be pretty
310     // painful.  Could take a byte [] or a Message but we want to be backward compatible around hfiles so would need
311     // to map between Message and Writable or byte [] and current Writable serialization.  This would be a bit of work
312     // to little gain.  Thats my thinking at moment.  St.Ack 20121129
313 
314     void appendMetaBlock(String bloomFilterMetaKey, Writable metaWriter);
315 
316     /**
317      * Store general Bloom filter in the file. This does not deal with Bloom filter
318      * internals but is necessary, since Bloom filters are stored differently
319      * in HFile version 1 and version 2.
320      */
321     void addGeneralBloomFilter(BloomFilterWriter bfw);
322 
323     /**
324      * Store delete family Bloom filter in the file, which is only supported in
325      * HFile V2.
326      */
327     void addDeleteFamilyBloomFilter(BloomFilterWriter bfw) throws IOException;
328 
329     /**
330      * Return the file context for the HFile this writer belongs to
331      */
332     HFileContext getFileContext();
333   }
334 
335   /**
336    * This variety of ways to construct writers is used throughout the code, and
337    * we want to be able to swap writer implementations.
338    */
339   public static abstract class WriterFactory {
340     protected final Configuration conf;
341     protected final CacheConfig cacheConf;
342     protected FileSystem fs;
343     protected Path path;
344     protected FSDataOutputStream ostream;
345     protected KVComparator comparator = KeyValue.COMPARATOR;
346     protected InetSocketAddress[] favoredNodes;
347     private HFileContext fileContext;
348 
349     WriterFactory(Configuration conf, CacheConfig cacheConf) {
350       this.conf = conf;
351       this.cacheConf = cacheConf;
352     }
353 
354     public WriterFactory withPath(FileSystem fs, Path path) {
355       Preconditions.checkNotNull(fs);
356       Preconditions.checkNotNull(path);
357       this.fs = fs;
358       this.path = path;
359       return this;
360     }
361 
362     public WriterFactory withOutputStream(FSDataOutputStream ostream) {
363       Preconditions.checkNotNull(ostream);
364       this.ostream = ostream;
365       return this;
366     }
367 
368     public WriterFactory withComparator(KVComparator comparator) {
369       Preconditions.checkNotNull(comparator);
370       this.comparator = comparator;
371       return this;
372     }
373 
374     public WriterFactory withFavoredNodes(InetSocketAddress[] favoredNodes) {
375       // Deliberately not checking for null here.
376       this.favoredNodes = favoredNodes;
377       return this;
378     }
379 
380     public WriterFactory withFileContext(HFileContext fileContext) {
381       this.fileContext = fileContext;
382       return this;
383     }
384 
385     public Writer create() throws IOException {
386       if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
387         throw new AssertionError("Please specify exactly one of " +
388             "filesystem/path or path");
389       }
390       if (path != null) {
391         ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
392       }
393       return createWriter(fs, path, ostream,
394                    comparator, fileContext);
395     }
396 
397     protected abstract Writer createWriter(FileSystem fs, Path path, FSDataOutputStream ostream,
398         KVComparator comparator, HFileContext fileContext) throws IOException;
399   }
400 
401   /** The configuration key for HFile version to use for new files */
402   public static final String FORMAT_VERSION_KEY = "hfile.format.version";
403 
404   public static int getFormatVersion(Configuration conf) {
405     int version = conf.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
406     checkFormatVersion(version);
407     return version;
408   }
409 
410   /**
411    * Returns the factory to be used to create {@link HFile} writers.
412    * Disables block cache access for all writers created through the
413    * returned factory.
414    */
415   public static final WriterFactory getWriterFactoryNoCache(Configuration
416        conf) {
417     Configuration tempConf = new Configuration(conf);
418     tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
419     return HFile.getWriterFactory(conf, new CacheConfig(tempConf));
420   }
421 
422   /**
423    * Returns the factory to be used to create {@link HFile} writers
424    */
425   public static final WriterFactory getWriterFactory(Configuration conf,
426       CacheConfig cacheConf) {
427     int version = getFormatVersion(conf);
428     switch (version) {
429     case 2:
430       return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
431     case 3:
432       return new HFileWriterV3.WriterFactoryV3(conf, cacheConf);
433     default:
434       throw new IllegalArgumentException("Cannot create writer for HFile " +
435           "format version " + version);
436     }
437   }
438 
439   /** An abstraction used by the block index */
440   public interface CachingBlockReader {
441     /**
442      * Read in a file block.
443      * @param offset offset to read.
444      * @param onDiskBlockSize size of the block
445      * @param cacheBlock
446      * @param pread
447      * @param isCompaction is this block being read as part of a compaction
448      * @param expectedBlockType the block type we are expecting to read with this read operation, or
449      *          null to read whatever block type is available and avoid checking (that might reduce
450      *          caching efficiency of encoded data blocks)
451      * @param expectedDataBlockEncoding the data block encoding the caller is
452      *          expecting data blocks to be in, or null to not perform this
453      *          check and return the block irrespective of the encoding. This
454      *          check only applies to data blocks and can be set to null when
455      *          the caller is expecting to read a non-data block and has set
456      *          expectedBlockType accordingly.
457      * @return Block wrapped in a ByteBuffer.
458      * @throws IOException
459      */
460     HFileBlock readBlock(long offset, long onDiskBlockSize,
461         boolean cacheBlock, final boolean pread, final boolean isCompaction,
462         final boolean updateCacheMetrics, BlockType expectedBlockType,
463         DataBlockEncoding expectedDataBlockEncoding)
464         throws IOException;
465   }
466 
467   /** An interface used by clients to open and iterate an {@link HFile}. */
468   public interface Reader extends Closeable, CachingBlockReader {
469     /**
470      * Returns this reader's "name". Usually the last component of the path.
471      * Needs to be constant as the file is being moved to support caching on
472      * write.
473      */
474     String getName();
475 
476     KVComparator getComparator();
477 
478     HFileScanner getScanner(boolean cacheBlocks,
479        final boolean pread, final boolean isCompaction);
480 
481     ByteBuffer getMetaBlock(String metaBlockName,
482        boolean cacheBlock) throws IOException;
483 
484     Map<byte[], byte[]> loadFileInfo() throws IOException;
485 
486     byte[] getLastKey();
487 
488     byte[] midkey() throws IOException;
489 
490     long length();
491 
492     long getEntries();
493 
494     byte[] getFirstKey();
495 
496     long indexSize();
497 
498     byte[] getFirstRowKey();
499 
500     byte[] getLastRowKey();
501 
502     FixedFileTrailer getTrailer();
503 
504     HFileBlockIndex.BlockIndexReader getDataBlockIndexReader();
505 
506     HFileScanner getScanner(boolean cacheBlocks, boolean pread);
507 
508     Compression.Algorithm getCompressionAlgorithm();
509 
510     /**
511      * Retrieves general Bloom filter metadata as appropriate for each
512      * {@link HFile} version.
513      * Knows nothing about how that metadata is structured.
514      */
515     DataInput getGeneralBloomFilterMetadata() throws IOException;
516 
517     /**
518      * Retrieves delete family Bloom filter metadata as appropriate for each
519      * {@link HFile}  version.
520      * Knows nothing about how that metadata is structured.
521      */
522     DataInput getDeleteBloomFilterMetadata() throws IOException;
523 
524     Path getPath();
525 
526     /** Close method with optional evictOnClose */
527     void close(boolean evictOnClose) throws IOException;
528 
529     DataBlockEncoding getDataBlockEncoding();
530 
531     boolean hasMVCCInfo();
532 
533     /**
534      * Return the file context of the HFile this reader belongs to
535      */
536     HFileContext getFileContext();
537   }
538 
539   /**
540    * Method returns the reader given the specified arguments.
541    * TODO This is a bad abstraction.  See HBASE-6635.
542    *
543    * @param path hfile's path
544    * @param fsdis stream of path's file
545    * @param size max size of the trailer.
546    * @param cacheConf Cache configuation values, cannot be null.
547    * @param hfs
548    * @return an appropriate instance of HFileReader
549    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
550    */
551   private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
552       long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
553     FixedFileTrailer trailer = null;
554     try {
555       boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
556       assert !isHBaseChecksum; // Initially we must read with FS checksum.
557       trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
558       switch (trailer.getMajorVersion()) {
559       case 2:
560         return new HFileReaderV2(path, trailer, fsdis, size, cacheConf, hfs, conf);
561       case 3 :
562         return new HFileReaderV3(path, trailer, fsdis, size, cacheConf, hfs, conf);
563       default:
564         throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
565       }
566     } catch (Throwable t) {
567       try {
568         fsdis.close();
569       } catch (Throwable t2) {
570         LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2);
571       }
572       throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
573     }
574   }
575 
576   /**
577    * @param fs A file system
578    * @param path Path to HFile
579    * @param fsdis a stream of path's file
580    * @param size max size of the trailer.
581    * @param cacheConf Cache configuration for hfile's contents
582    * @param conf Configuration
583    * @return A version specific Hfile Reader
584    * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
585    */
586   public static Reader createReader(FileSystem fs, Path path,
587       FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf)
588       throws IOException {
589     HFileSystem hfs = null;
590 
591     // If the fs is not an instance of HFileSystem, then create an
592     // instance of HFileSystem that wraps over the specified fs.
593     // In this case, we will not be able to avoid checksumming inside
594     // the filesystem.
595     if (!(fs instanceof HFileSystem)) {
596       hfs = new HFileSystem(fs);
597     } else {
598       hfs = (HFileSystem)fs;
599     }
600     return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf);
601   }
602 
603   /**
604    *
605    * @param fs filesystem
606    * @param path Path to file to read
607    * @param cacheConf This must not be null.  @see {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
608    * @return an active Reader instance
609    * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
610    */
611   public static Reader createReader(
612       FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException {
613     Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
614     FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
615     return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
616       cacheConf, stream.getHfs(), conf);
617   }
618 
619   /**
620    * This factory method is used only by unit tests
621    */
622   static Reader createReaderFromStream(Path path,
623       FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf)
624       throws IOException {
625     FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
626     return pickReaderVersion(path, wrapper, size, cacheConf, null, conf);
627   }
628 
629   /**
630    * Metadata for this file. Conjured by the writer. Read in by the reader.
631    */
632   public static class FileInfo implements SortedMap<byte[], byte[]> {
633     static final String RESERVED_PREFIX = "hfile.";
634     static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX);
635     static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY");
636     static final byte [] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
637     static final byte [] AVG_VALUE_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN");
638     static final byte [] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
639     static final byte [] TAGS_COMPRESSED = Bytes.toBytes(RESERVED_PREFIX + "TAGS_COMPRESSED");
640     public static final byte [] MAX_TAGS_LEN = Bytes.toBytes(RESERVED_PREFIX + "MAX_TAGS_LEN");
641     private final SortedMap<byte [], byte []> map = new TreeMap<byte [], byte []>(Bytes.BYTES_COMPARATOR);
642 
643     public FileInfo() {
644       super();
645     }
646 
647     /**
648      * Append the given key/value pair to the file info, optionally checking the
649      * key prefix.
650      *
651      * @param k key to add
652      * @param v value to add
653      * @param checkPrefix whether to check that the provided key does not start
654      *          with the reserved prefix
655      * @return this file info object
656      * @throws IOException if the key or value is invalid
657      */
658     public FileInfo append(final byte[] k, final byte[] v,
659         final boolean checkPrefix) throws IOException {
660       if (k == null || v == null) {
661         throw new NullPointerException("Key nor value may be null");
662       }
663       if (checkPrefix && isReservedFileInfoKey(k)) {
664         throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX
665             + " are reserved");
666       }
667       put(k, v);
668       return this;
669     }
670 
671     public void clear() {
672       this.map.clear();
673     }
674 
675     public Comparator<? super byte[]> comparator() {
676       return map.comparator();
677     }
678 
679     public boolean containsKey(Object key) {
680       return map.containsKey(key);
681     }
682 
683     public boolean containsValue(Object value) {
684       return map.containsValue(value);
685     }
686 
687     public Set<java.util.Map.Entry<byte[], byte[]>> entrySet() {
688       return map.entrySet();
689     }
690 
691     public boolean equals(Object o) {
692       return map.equals(o);
693     }
694 
695     public byte[] firstKey() {
696       return map.firstKey();
697     }
698 
699     public byte[] get(Object key) {
700       return map.get(key);
701     }
702 
703     public int hashCode() {
704       return map.hashCode();
705     }
706 
707     public SortedMap<byte[], byte[]> headMap(byte[] toKey) {
708       return this.map.headMap(toKey);
709     }
710 
711     public boolean isEmpty() {
712       return map.isEmpty();
713     }
714 
715     public Set<byte[]> keySet() {
716       return map.keySet();
717     }
718 
719     public byte[] lastKey() {
720       return map.lastKey();
721     }
722 
723     public byte[] put(byte[] key, byte[] value) {
724       return this.map.put(key, value);
725     }
726 
727     public void putAll(Map<? extends byte[], ? extends byte[]> m) {
728       this.map.putAll(m);
729     }
730 
731     public byte[] remove(Object key) {
732       return this.map.remove(key);
733     }
734 
735     public int size() {
736       return map.size();
737     }
738 
739     public SortedMap<byte[], byte[]> subMap(byte[] fromKey, byte[] toKey) {
740       return this.map.subMap(fromKey, toKey);
741     }
742 
743     public SortedMap<byte[], byte[]> tailMap(byte[] fromKey) {
744       return this.map.tailMap(fromKey);
745     }
746 
747     public Collection<byte[]> values() {
748       return map.values();
749     }
750 
751     /**
752      * Write out this instance on the passed in <code>out</code> stream.
753      * We write it as a protobuf.
754      * @param out
755      * @throws IOException
756      * @see #read(DataInputStream)
757      */
758     void write(final DataOutputStream out) throws IOException {
759       HFileProtos.FileInfoProto.Builder builder = HFileProtos.FileInfoProto.newBuilder();
760       for (Map.Entry<byte [], byte[]> e: this.map.entrySet()) {
761         HBaseProtos.BytesBytesPair.Builder bbpBuilder = HBaseProtos.BytesBytesPair.newBuilder();
762         bbpBuilder.setFirst(HBaseZeroCopyByteString.wrap(e.getKey()));
763         bbpBuilder.setSecond(HBaseZeroCopyByteString.wrap(e.getValue()));
764         builder.addMapEntry(bbpBuilder.build());
765       }
766       out.write(ProtobufUtil.PB_MAGIC);
767       builder.build().writeDelimitedTo(out);
768     }
769 
770     /**
771      * Populate this instance with what we find on the passed in <code>in</code> stream.
772      * Can deserialize protobuf of old Writables format.
773      * @param in
774      * @throws IOException
775      * @see #write(DataOutputStream)
776      */
777     void read(final DataInputStream in) throws IOException {
778       // This code is tested over in TestHFileReaderV1 where we read an old hfile w/ this new code.
779       int pblen = ProtobufUtil.lengthOfPBMagic();
780       byte [] pbuf = new byte[pblen];
781       if (in.markSupported()) in.mark(pblen);
782       int read = in.read(pbuf);
783       if (read != pblen) throw new IOException("read=" + read + ", wanted=" + pblen);
784       if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
785         parsePB(HFileProtos.FileInfoProto.parseDelimitedFrom(in));
786       } else {
787         if (in.markSupported()) {
788           in.reset();
789           parseWritable(in);
790         } else {
791           // We cannot use BufferedInputStream, it consumes more than we read from the underlying IS
792           ByteArrayInputStream bais = new ByteArrayInputStream(pbuf);
793           SequenceInputStream sis = new SequenceInputStream(bais, in); // Concatenate input streams
794           // TODO: Am I leaking anything here wrapping the passed in stream?  We are not calling close on the wrapped
795           // streams but they should be let go after we leave this context?  I see that we keep a reference to the
796           // passed in inputstream but since we no longer have a reference to this after we leave, we should be ok.
797           parseWritable(new DataInputStream(sis));
798         }
799       }
800     }
801 
802     /** Now parse the old Writable format.  It was a list of Map entries.  Each map entry was a key and a value of
803      * a byte [].  The old map format had a byte before each entry that held a code which was short for the key or
804      * value type.  We know it was a byte [] so in below we just read and dump it.
805      * @throws IOException
806      */
807     void parseWritable(final DataInputStream in) throws IOException {
808       // First clear the map.  Otherwise we will just accumulate entries every time this method is called.
809       this.map.clear();
810       // Read the number of entries in the map
811       int entries = in.readInt();
812       // Then read each key/value pair
813       for (int i = 0; i < entries; i++) {
814         byte [] key = Bytes.readByteArray(in);
815         // We used to read a byte that encoded the class type.  Read and ignore it because it is always byte [] in hfile
816         in.readByte();
817         byte [] value = Bytes.readByteArray(in);
818         this.map.put(key, value);
819       }
820     }
821 
822     /**
823      * Fill our map with content of the pb we read off disk
824      * @param fip protobuf message to read
825      */
826     void parsePB(final HFileProtos.FileInfoProto fip) {
827       this.map.clear();
828       for (BytesBytesPair pair: fip.getMapEntryList()) {
829         this.map.put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
830       }
831     }
832   }
833 
834   /** Return true if the given file info key is reserved for internal use. */
835   public static boolean isReservedFileInfoKey(byte[] key) {
836     return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
837   }
838 
839   /**
840    * Get names of supported compression algorithms. The names are acceptable by
841    * HFile.Writer.
842    *
843    * @return Array of strings, each represents a supported compression
844    *         algorithm. Currently, the following compression algorithms are
845    *         supported.
846    *         <ul>
847    *         <li>"none" - No compression.
848    *         <li>"gz" - GZIP compression.
849    *         </ul>
850    */
851   public static String[] getSupportedCompressionAlgorithms() {
852     return Compression.getSupportedAlgorithms();
853   }
854 
855   // Utility methods.
856   /*
857    * @param l Long to convert to an int.
858    * @return <code>l</code> cast as an int.
859    */
860   static int longToInt(final long l) {
861     // Expecting the size() of a block not exceeding 4GB. Assuming the
862     // size() will wrap to negative integer if it exceeds 2GB (From tfile).
863     return (int)(l & 0x00000000ffffffffL);
864   }
865 
866   /**
867    * Returns all files belonging to the given region directory. Could return an
868    * empty list.
869    *
870    * @param fs  The file system reference.
871    * @param regionDir  The region directory to scan.
872    * @return The list of files found.
873    * @throws IOException When scanning the files fails.
874    */
875   static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
876       throws IOException {
877     List<Path> res = new ArrayList<Path>();
878     PathFilter dirFilter = new FSUtils.DirFilter(fs);
879     FileStatus[] familyDirs = fs.listStatus(regionDir, dirFilter);
880     for(FileStatus dir : familyDirs) {
881       FileStatus[] files = fs.listStatus(dir.getPath());
882       for (FileStatus file : files) {
883         if (!file.isDirectory()) {
884           res.add(file.getPath());
885         }
886       }
887     }
888     return res;
889   }
890 
891   public static void main(String[] args) throws IOException {
892     HFilePrettyPrinter prettyPrinter = new HFilePrettyPrinter();
893     System.exit(prettyPrinter.run(args));
894   }
895 
896   /**
897    * Checks the given {@link HFile} format version, and throws an exception if
898    * invalid. Note that if the version number comes from an input file and has
899    * not been verified, the caller needs to re-throw an {@link IOException} to
900    * indicate that this is not a software error, but corrupted input.
901    *
902    * @param version an HFile version
903    * @throws IllegalArgumentException if the version is invalid
904    */
905   public static void checkFormatVersion(int version)
906       throws IllegalArgumentException {
907     if (version < MIN_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
908       throw new IllegalArgumentException("Invalid HFile version: " + version
909           + " (expected to be " + "between " + MIN_FORMAT_VERSION + " and "
910           + MAX_FORMAT_VERSION + ")");
911     }
912   }
913 }