001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.io.hfile;
020
021import java.io.ByteArrayInputStream;
022import java.io.Closeable;
023import java.io.DataInput;
024import java.io.DataInputStream;
025import java.io.DataOutputStream;
026import java.io.IOException;
027import java.io.SequenceInputStream;
028import java.net.InetSocketAddress;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.Comparator;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.SortedMap;
037import java.util.TreeMap;
038import java.util.concurrent.atomic.LongAdder;
039
040import org.apache.commons.io.IOUtils;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FSDataInputStream;
043import org.apache.hadoop.fs.FSDataOutputStream;
044import org.apache.hadoop.fs.FileStatus;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.fs.PathFilter;
048import org.apache.hadoop.hbase.Cell;
049import org.apache.hadoop.hbase.CellComparator;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054import org.apache.hadoop.hbase.fs.HFileSystem;
055import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
056import org.apache.hadoop.hbase.io.MetricsIO;
057import org.apache.hadoop.hbase.io.MetricsIOWrapperImpl;
058import org.apache.hadoop.hbase.io.compress.Compression;
059import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
060import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
061import org.apache.hadoop.hbase.regionserver.CellSink;
062import org.apache.hadoop.hbase.regionserver.ShipperListener;
063import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
064import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.BytesBytesPair;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.HFileProtos;
068import org.apache.hadoop.hbase.util.BloomFilterWriter;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.FSUtils;
071import org.apache.hadoop.io.Writable;
072
073import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
074import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
075
076/**
077 * File format for hbase.
078 * A file of sorted key/value pairs. Both keys and values are byte arrays.
079 * <p>
080 * The memory footprint of a HFile includes the following (below is taken from the
081 * <a
082 * href=https://issues.apache.org/jira/browse/HADOOP-3315>TFile</a> documentation
083 * but applies also to HFile):
084 * <ul>
085 * <li>Some constant overhead of reading or writing a compressed block.
086 * <ul>
087 * <li>Each compressed block requires one compression/decompression codec for
088 * I/O.
089 * <li>Temporary space to buffer the key.
090 * <li>Temporary space to buffer the value.
091 * </ul>
092 * <li>HFile index, which is proportional to the total number of Data Blocks.
093 * The total amount of memory needed to hold the index can be estimated as
094 * (56+AvgKeySize)*NumBlocks.
095 * </ul>
096 * Suggestions on performance optimization.
097 * <ul>
098 * <li>Minimum block size. We recommend a setting of minimum block size between
099 * 8KB to 1MB for general usage. Larger block size is preferred if files are
100 * primarily for sequential access. However, it would lead to inefficient random
101 * access (because there are more data to decompress). Smaller blocks are good
102 * for random access, but require more memory to hold the block index, and may
103 * be slower to create (because we must flush the compressor stream at the
104 * conclusion of each data block, which leads to an FS I/O flush). Further, due
105 * to the internal caching in Compression codec, the smallest possible block
106 * size would be around 20KB-30KB.
107 * <li>The current implementation does not offer true multi-threading for
108 * reading. The implementation uses FSDataInputStream seek()+read(), which is
109 * shown to be much faster than positioned-read call in single thread mode.
110 * However, it also means that if multiple threads attempt to access the same
111 * HFile (using multiple scanners) simultaneously, the actual I/O is carried out
112 * sequentially even if they access different DFS blocks (Reexamine! pread seems
113 * to be 10% faster than seek+read in my testing -- stack).
114 * <li>Compression codec. Use "none" if the data is not very compressable (by
115 * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo"
116 * as the starting point for experimenting. "gz" overs slightly better
117 * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
118 * decompress, comparing to "lzo".
119 * </ul>
120 *
121 * For more on the background behind HFile, see <a
122 * href=https://issues.apache.org/jira/browse/HBASE-61>HBASE-61</a>.
123 * <p>
124 * File is made of data blocks followed by meta data blocks (if any), a fileinfo
125 * block, data block index, meta data block index, and a fixed size trailer
126 * which records the offsets at which file changes content type.
127 * <pre>&lt;data blocks&gt;&lt;meta blocks&gt;&lt;fileinfo&gt;&lt;
128 * data index&gt;&lt;meta index&gt;&lt;trailer&gt;</pre>
129 * Each block has a bit of magic at its start.  Block are comprised of
130 * key/values.  In data blocks, they are both byte arrays.  Metadata blocks are
131 * a String key and a byte array value.  An empty file looks like this:
132 * <pre>&lt;fileinfo&gt;&lt;trailer&gt;</pre>.  That is, there are not data nor meta
133 * blocks present.
134 * <p>
135 * TODO: Do scanners need to be able to take a start and end row?
136 * TODO: Should BlockIndex know the name of its file?  Should it have a Path
137 * that points at its file say for the case where an index lives apart from
138 * an HFile instance?
139 */
140@InterfaceAudience.Private
141public class HFile {
142  // LOG is being used in HFileBlock and CheckSumUtil
143  static final Logger LOG = LoggerFactory.getLogger(HFile.class);
144
145  /**
146   * Maximum length of key in HFile.
147   */
148  public final static int MAXIMUM_KEY_LENGTH = Integer.MAX_VALUE;
149
150  /**
151   * Default compression: none.
152   */
153  public final static Compression.Algorithm DEFAULT_COMPRESSION_ALGORITHM =
154    Compression.Algorithm.NONE;
155
156  /** Minimum supported HFile format version */
157  public static final int MIN_FORMAT_VERSION = 2;
158
159  /** Maximum supported HFile format version
160   */
161  public static final int MAX_FORMAT_VERSION = 3;
162
163  /**
164   * Minimum HFile format version with support for persisting cell tags
165   */
166  public static final int MIN_FORMAT_VERSION_WITH_TAGS = 3;
167
168  /** Default compression name: none. */
169  public final static String DEFAULT_COMPRESSION =
170    DEFAULT_COMPRESSION_ALGORITHM.getName();
171
172  /** Meta data block name for bloom filter bits. */
173  public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
174
175  /**
176   * We assume that HFile path ends with
177   * ROOT_DIR/TABLE_NAME/REGION_NAME/CF_NAME/HFILE, so it has at least this
178   * many levels of nesting. This is needed for identifying table and CF name
179   * from an HFile path.
180   */
181  public final static int MIN_NUM_HFILE_PATH_LEVELS = 5;
182
183  /**
184   * The number of bytes per checksum.
185   */
186  public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
187
188  // For measuring number of checksum failures
189  static final LongAdder CHECKSUM_FAILURES = new LongAdder();
190
191  // For tests. Gets incremented when we read a block whether from HDFS or from Cache.
192  public static final LongAdder DATABLOCK_READ_COUNT = new LongAdder();
193
194  /** Static instance for the metrics so that HFileReaders access the same instance */
195  static final MetricsIO metrics = new MetricsIO(new MetricsIOWrapperImpl());
196
197  /**
198   * Number of checksum verification failures. It also
199   * clears the counter.
200   */
201  public static final long getAndResetChecksumFailuresCount() {
202    return CHECKSUM_FAILURES.sumThenReset();
203  }
204
205  /**
206   * Number of checksum verification failures. It also
207   * clears the counter.
208   */
209  public static final long getChecksumFailuresCount() {
210    return CHECKSUM_FAILURES.sum();
211  }
212
213  public static final void updateReadLatency(long latencyMillis, boolean pread) {
214    if (pread) {
215      metrics.updateFsPreadTime(latencyMillis);
216    } else {
217      metrics.updateFsReadTime(latencyMillis);
218    }
219  }
220
221  public static final void updateWriteLatency(long latencyMillis) {
222    metrics.updateFsWriteTime(latencyMillis);
223  }
224
225  /** API required to write an {@link HFile} */
226  public interface Writer extends Closeable, CellSink, ShipperListener {
227    /** Max memstore (mvcc) timestamp in FileInfo */
228    public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
229
230    /** Add an element to the file info map. */
231    void appendFileInfo(byte[] key, byte[] value) throws IOException;
232
233    /** @return the path to this {@link HFile} */
234    Path getPath();
235
236    /**
237     * Adds an inline block writer such as a multi-level block index writer or
238     * a compound Bloom filter writer.
239     */
240    void addInlineBlockWriter(InlineBlockWriter bloomWriter);
241
242    // The below three methods take Writables.  We'd like to undo Writables but undoing the below would be pretty
243    // painful.  Could take a byte [] or a Message but we want to be backward compatible around hfiles so would need
244    // to map between Message and Writable or byte [] and current Writable serialization.  This would be a bit of work
245    // to little gain.  Thats my thinking at moment.  St.Ack 20121129
246
247    void appendMetaBlock(String bloomFilterMetaKey, Writable metaWriter);
248
249    /**
250     * Store general Bloom filter in the file. This does not deal with Bloom filter
251     * internals but is necessary, since Bloom filters are stored differently
252     * in HFile version 1 and version 2.
253     */
254    void addGeneralBloomFilter(BloomFilterWriter bfw);
255
256    /**
257     * Store delete family Bloom filter in the file, which is only supported in
258     * HFile V2.
259     */
260    void addDeleteFamilyBloomFilter(BloomFilterWriter bfw) throws IOException;
261
262    /**
263     * Return the file context for the HFile this writer belongs to
264     */
265    HFileContext getFileContext();
266  }
267
268  /**
269   * This variety of ways to construct writers is used throughout the code, and
270   * we want to be able to swap writer implementations.
271   */
272  public static class WriterFactory {
273    protected final Configuration conf;
274    protected final CacheConfig cacheConf;
275    protected FileSystem fs;
276    protected Path path;
277    protected FSDataOutputStream ostream;
278    protected CellComparator comparator = CellComparator.getInstance();
279    protected InetSocketAddress[] favoredNodes;
280    private HFileContext fileContext;
281    protected boolean shouldDropBehind = false;
282
283    WriterFactory(Configuration conf, CacheConfig cacheConf) {
284      this.conf = conf;
285      this.cacheConf = cacheConf;
286    }
287
288    public WriterFactory withPath(FileSystem fs, Path path) {
289      Preconditions.checkNotNull(fs);
290      Preconditions.checkNotNull(path);
291      this.fs = fs;
292      this.path = path;
293      return this;
294    }
295
296    public WriterFactory withOutputStream(FSDataOutputStream ostream) {
297      Preconditions.checkNotNull(ostream);
298      this.ostream = ostream;
299      return this;
300    }
301
302    public WriterFactory withComparator(CellComparator comparator) {
303      Preconditions.checkNotNull(comparator);
304      this.comparator = comparator;
305      return this;
306    }
307
308    public WriterFactory withFavoredNodes(InetSocketAddress[] favoredNodes) {
309      // Deliberately not checking for null here.
310      this.favoredNodes = favoredNodes;
311      return this;
312    }
313
314    public WriterFactory withFileContext(HFileContext fileContext) {
315      this.fileContext = fileContext;
316      return this;
317    }
318
319    public WriterFactory withShouldDropCacheBehind(boolean shouldDropBehind) {
320      this.shouldDropBehind = shouldDropBehind;
321      return this;
322    }
323
324
325    public Writer create() throws IOException {
326      if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
327        throw new AssertionError("Please specify exactly one of " +
328            "filesystem/path or path");
329      }
330      if (path != null) {
331        ostream = HFileWriterImpl.createOutputStream(conf, fs, path, favoredNodes);
332        try {
333          ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction());
334        } catch (UnsupportedOperationException uoe) {
335          LOG.trace("Unable to set drop behind on {}", path, uoe);
336          LOG.debug("Unable to set drop behind on {}", path.getName());
337        }
338      }
339      return new HFileWriterImpl(conf, cacheConf, path, ostream, comparator, fileContext);
340    }
341  }
342
343  /** The configuration key for HFile version to use for new files */
344  public static final String FORMAT_VERSION_KEY = "hfile.format.version";
345
346  public static int getFormatVersion(Configuration conf) {
347    int version = conf.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
348    checkFormatVersion(version);
349    return version;
350  }
351
352  /**
353   * Returns the factory to be used to create {@link HFile} writers.
354   * Disables block cache access for all writers created through the
355   * returned factory.
356   */
357  public static final WriterFactory getWriterFactoryNoCache(Configuration
358       conf) {
359    return HFile.getWriterFactory(conf, CacheConfig.DISABLED);
360  }
361
362  /**
363   * Returns the factory to be used to create {@link HFile} writers
364   */
365  public static final WriterFactory getWriterFactory(Configuration conf,
366      CacheConfig cacheConf) {
367    int version = getFormatVersion(conf);
368    switch (version) {
369    case 2:
370      throw new IllegalArgumentException("This should never happen. " +
371        "Did you change hfile.format.version to read v2? This version of the software writes v3" +
372        " hfiles only (but it can read v2 files without having to update hfile.format.version " +
373        "in hbase-site.xml)");
374    case 3:
375      return new HFile.WriterFactory(conf, cacheConf);
376    default:
377      throw new IllegalArgumentException("Cannot create writer for HFile " +
378          "format version " + version);
379    }
380  }
381
382  /**
383   * An abstraction used by the block index.
384   * Implementations will check cache for any asked-for block and return cached block if found.
385   * Otherwise, after reading from fs, will try and put block into cache before returning.
386   */
387  public interface CachingBlockReader {
388    /**
389     * Read in a file block.
390     * @param offset offset to read.
391     * @param onDiskBlockSize size of the block
392     * @param cacheBlock
393     * @param pread
394     * @param isCompaction is this block being read as part of a compaction
395     * @param expectedBlockType the block type we are expecting to read with this read operation,
396     *  or null to read whatever block type is available and avoid checking (that might reduce
397     *  caching efficiency of encoded data blocks)
398     * @param expectedDataBlockEncoding the data block encoding the caller is expecting data blocks
399     *  to be in, or null to not perform this check and return the block irrespective of the
400     *  encoding. This check only applies to data blocks and can be set to null when the caller is
401     *  expecting to read a non-data block and has set expectedBlockType accordingly.
402     * @return Block wrapped in a ByteBuffer.
403     * @throws IOException
404     */
405    HFileBlock readBlock(long offset, long onDiskBlockSize,
406        boolean cacheBlock, final boolean pread, final boolean isCompaction,
407        final boolean updateCacheMetrics, BlockType expectedBlockType,
408        DataBlockEncoding expectedDataBlockEncoding)
409        throws IOException;
410
411    /**
412     * Return the given block back to the cache, if it was obtained from cache.
413     * @param block Block to be returned.
414     */
415    void returnBlock(HFileBlock block);
416  }
417
418  /** An interface used by clients to open and iterate an {@link HFile}. */
419  public interface Reader extends Closeable, CachingBlockReader {
420    /**
421     * Returns this reader's "name". Usually the last component of the path.
422     * Needs to be constant as the file is being moved to support caching on
423     * write.
424     */
425    String getName();
426
427    CellComparator getComparator();
428
429    HFileScanner getScanner(boolean cacheBlocks, final boolean pread, final boolean isCompaction);
430
431    HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException;
432
433    Map<byte[], byte[]> loadFileInfo() throws IOException;
434
435    Optional<Cell> getLastKey();
436
437    Optional<Cell> midKey() throws IOException;
438
439    long length();
440
441    long getEntries();
442
443    Optional<Cell> getFirstKey();
444
445    long indexSize();
446
447    Optional<byte[]> getFirstRowKey();
448
449    Optional<byte[]> getLastRowKey();
450
451    FixedFileTrailer getTrailer();
452
453    HFileBlockIndex.BlockIndexReader getDataBlockIndexReader();
454
455    HFileScanner getScanner(boolean cacheBlocks, boolean pread);
456
457    Compression.Algorithm getCompressionAlgorithm();
458
459    /**
460     * Retrieves general Bloom filter metadata as appropriate for each
461     * {@link HFile} version.
462     * Knows nothing about how that metadata is structured.
463     */
464    DataInput getGeneralBloomFilterMetadata() throws IOException;
465
466    /**
467     * Retrieves delete family Bloom filter metadata as appropriate for each
468     * {@link HFile}  version.
469     * Knows nothing about how that metadata is structured.
470     */
471    DataInput getDeleteBloomFilterMetadata() throws IOException;
472
473    Path getPath();
474
475    /** Close method with optional evictOnClose */
476    void close(boolean evictOnClose) throws IOException;
477
478    DataBlockEncoding getDataBlockEncoding();
479
480    boolean hasMVCCInfo();
481
482    /**
483     * Return the file context of the HFile this reader belongs to
484     */
485    HFileContext getFileContext();
486
487    boolean isPrimaryReplicaReader();
488
489    boolean shouldIncludeMemStoreTS();
490
491    boolean isDecodeMemStoreTS();
492
493    DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction);
494
495    @VisibleForTesting
496    HFileBlock.FSReader getUncachedBlockReader();
497
498    @VisibleForTesting
499    boolean prefetchComplete();
500
501    /**
502     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
503     * implementation should take care of thread safety.
504     */
505    void unbufferStream();
506  }
507
508  /**
509   * Method returns the reader given the specified arguments.
510   * TODO This is a bad abstraction.  See HBASE-6635.
511   *
512   * @param path hfile's path
513   * @param fsdis stream of path's file
514   * @param size max size of the trailer.
515   * @param cacheConf Cache configuation values, cannot be null.
516   * @param hfs
517   * @param primaryReplicaReader true if this is a reader for primary replica
518   * @return an appropriate instance of HFileReader
519   * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
520   */
521  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
522      justification="Intentional")
523  private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis, long size,
524      CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader, Configuration conf)
525      throws IOException {
526    FixedFileTrailer trailer = null;
527    try {
528      boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
529      assert !isHBaseChecksum; // Initially we must read with FS checksum.
530      trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
531      switch (trailer.getMajorVersion()) {
532        case 2:
533          LOG.debug("Opening HFile v2 with v3 reader");
534          // Fall through. FindBugs: SF_SWITCH_FALLTHROUGH
535        case 3:
536          return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs,
537              primaryReplicaReader, conf);
538        default:
539          throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion());
540      }
541    } catch (Throwable t) {
542      IOUtils.closeQuietly(fsdis);
543      throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
544    } finally {
545      fsdis.unbuffer();
546    }
547  }
548
549  /**
550   * The sockets and the file descriptors held by the method parameter
551   * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure
552   * that no other threads have access to the same passed reference.
553   * @param fs A file system
554   * @param path Path to HFile
555   * @param fsdis a stream of path's file
556   * @param size max size of the trailer.
557   * @param cacheConf Cache configuration for hfile's contents
558   * @param primaryReplicaReader true if this is a reader for primary replica
559   * @param conf Configuration
560   * @return A version specific Hfile Reader
561   * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
562   */
563  public static Reader createReader(FileSystem fs, Path path, FSDataInputStreamWrapper fsdis,
564      long size, CacheConfig cacheConf, boolean primaryReplicaReader, Configuration conf)
565      throws IOException {
566    HFileSystem hfs = null;
567
568    // If the fs is not an instance of HFileSystem, then create an
569    // instance of HFileSystem that wraps over the specified fs.
570    // In this case, we will not be able to avoid checksumming inside
571    // the filesystem.
572    if (!(fs instanceof HFileSystem)) {
573      hfs = new HFileSystem(fs);
574    } else {
575      hfs = (HFileSystem) fs;
576    }
577    return openReader(path, fsdis, size, cacheConf, hfs, primaryReplicaReader, conf);
578  }
579
580  /**
581  * Creates reader with cache configuration disabled
582  * @param fs filesystem
583  * @param path Path to file to read
584  * @return an active Reader instance
585  * @throws IOException Will throw a CorruptHFileException
586  * (DoNotRetryIOException subtype) if hfile is corrupt/invalid.
587  */
588  public static Reader createReader(FileSystem fs, Path path, Configuration conf)
589      throws IOException {
590    // The primaryReplicaReader is mainly used for constructing block cache key, so if we do not use
591    // block cache then it is OK to set it as any value. We use true here.
592    return createReader(fs, path, CacheConfig.DISABLED, true, conf);
593  }
594
595  /**
596   * @param fs filesystem
597   * @param path Path to file to read
598   * @param cacheConf This must not be null. @see
599   *          {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
600   * @param primaryReplicaReader true if this is a reader for primary replica
601   * @return an active Reader instance
602   * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile
603   *           is corrupt/invalid.
604   */
605  public static Reader createReader(FileSystem fs, Path path, CacheConfig cacheConf,
606      boolean primaryReplicaReader, Configuration conf) throws IOException {
607    Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
608    FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
609    return openReader(path, stream, fs.getFileStatus(path).getLen(), cacheConf,
610      stream.getHfs(), primaryReplicaReader, conf);
611  }
612
613  /**
614   * This factory method is used only by unit tests. <br/>
615   * The sockets and the file descriptors held by the method parameter
616   * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure
617   * that no other threads have access to the same passed reference.
618   */
619  @VisibleForTesting
620  static Reader createReaderFromStream(Path path, FSDataInputStream fsdis, long size,
621      CacheConfig cacheConf, Configuration conf) throws IOException {
622    FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
623    return openReader(path, wrapper, size, cacheConf, null, true, conf);
624  }
625
626  /**
627   * Returns true if the specified file has a valid HFile Trailer.
628   * @param fs filesystem
629   * @param path Path to file to verify
630   * @return true if the file has a valid HFile Trailer, otherwise false
631   * @throws IOException if failed to read from the underlying stream
632   */
633  public static boolean isHFileFormat(final FileSystem fs, final Path path) throws IOException {
634    return isHFileFormat(fs, fs.getFileStatus(path));
635  }
636
637  /**
638   * Returns true if the specified file has a valid HFile Trailer.
639   * @param fs filesystem
640   * @param fileStatus the file to verify
641   * @return true if the file has a valid HFile Trailer, otherwise false
642   * @throws IOException if failed to read from the underlying stream
643   */
644  public static boolean isHFileFormat(final FileSystem fs, final FileStatus fileStatus)
645      throws IOException {
646    final Path path = fileStatus.getPath();
647    final long size = fileStatus.getLen();
648    try (FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path)) {
649      boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
650      assert !isHBaseChecksum; // Initially we must read with FS checksum.
651      FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size);
652      return true;
653    } catch (IllegalArgumentException e) {
654      return false;
655    }
656  }
657
658  /**
659   * Metadata for this file. Conjured by the writer. Read in by the reader.
660   */
661  public static class FileInfo implements SortedMap<byte[], byte[]> {
662    static final String RESERVED_PREFIX = "hfile.";
663    static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX);
664    static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY");
665    static final byte [] AVG_KEY_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
666    static final byte [] AVG_VALUE_LEN = Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN");
667    static final byte [] CREATE_TIME_TS = Bytes.toBytes(RESERVED_PREFIX + "CREATE_TIME_TS");
668    static final byte [] COMPARATOR = Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
669    static final byte [] TAGS_COMPRESSED = Bytes.toBytes(RESERVED_PREFIX + "TAGS_COMPRESSED");
670    public static final byte [] MAX_TAGS_LEN = Bytes.toBytes(RESERVED_PREFIX + "MAX_TAGS_LEN");
671    private final SortedMap<byte [], byte []> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
672
673    public FileInfo() {
674      super();
675    }
676
677    /**
678     * Append the given key/value pair to the file info, optionally checking the
679     * key prefix.
680     *
681     * @param k key to add
682     * @param v value to add
683     * @param checkPrefix whether to check that the provided key does not start
684     *          with the reserved prefix
685     * @return this file info object
686     * @throws IOException if the key or value is invalid
687     */
688    public FileInfo append(final byte[] k, final byte[] v,
689        final boolean checkPrefix) throws IOException {
690      if (k == null || v == null) {
691        throw new NullPointerException("Key nor value may be null");
692      }
693      if (checkPrefix && isReservedFileInfoKey(k)) {
694        throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX
695            + " are reserved");
696      }
697      put(k, v);
698      return this;
699    }
700
701    @Override
702    public void clear() {
703      this.map.clear();
704    }
705
706    @Override
707    public Comparator<? super byte[]> comparator() {
708      return map.comparator();
709    }
710
711    @Override
712    public boolean containsKey(Object key) {
713      return map.containsKey(key);
714    }
715
716    @Override
717    public boolean containsValue(Object value) {
718      return map.containsValue(value);
719    }
720
721    @Override
722    public Set<java.util.Map.Entry<byte[], byte[]>> entrySet() {
723      return map.entrySet();
724    }
725
726    @Override
727    public boolean equals(Object o) {
728      return map.equals(o);
729    }
730
731    @Override
732    public byte[] firstKey() {
733      return map.firstKey();
734    }
735
736    @Override
737    public byte[] get(Object key) {
738      return map.get(key);
739    }
740
741    @Override
742    public int hashCode() {
743      return map.hashCode();
744    }
745
746    @Override
747    public SortedMap<byte[], byte[]> headMap(byte[] toKey) {
748      return this.map.headMap(toKey);
749    }
750
751    @Override
752    public boolean isEmpty() {
753      return map.isEmpty();
754    }
755
756    @Override
757    public Set<byte[]> keySet() {
758      return map.keySet();
759    }
760
761    @Override
762    public byte[] lastKey() {
763      return map.lastKey();
764    }
765
766    @Override
767    public byte[] put(byte[] key, byte[] value) {
768      return this.map.put(key, value);
769    }
770
771    @Override
772    public void putAll(Map<? extends byte[], ? extends byte[]> m) {
773      this.map.putAll(m);
774    }
775
776    @Override
777    public byte[] remove(Object key) {
778      return this.map.remove(key);
779    }
780
781    @Override
782    public int size() {
783      return map.size();
784    }
785
786    @Override
787    public SortedMap<byte[], byte[]> subMap(byte[] fromKey, byte[] toKey) {
788      return this.map.subMap(fromKey, toKey);
789    }
790
791    @Override
792    public SortedMap<byte[], byte[]> tailMap(byte[] fromKey) {
793      return this.map.tailMap(fromKey);
794    }
795
796    @Override
797    public Collection<byte[]> values() {
798      return map.values();
799    }
800
801    /**
802     * Write out this instance on the passed in <code>out</code> stream.
803     * We write it as a protobuf.
804     * @param out
805     * @throws IOException
806     * @see #read(DataInputStream)
807     */
808    void write(final DataOutputStream out) throws IOException {
809      HFileProtos.FileInfoProto.Builder builder = HFileProtos.FileInfoProto.newBuilder();
810      for (Map.Entry<byte [], byte[]> e: this.map.entrySet()) {
811        HBaseProtos.BytesBytesPair.Builder bbpBuilder = HBaseProtos.BytesBytesPair.newBuilder();
812        bbpBuilder.setFirst(UnsafeByteOperations.unsafeWrap(e.getKey()));
813        bbpBuilder.setSecond(UnsafeByteOperations.unsafeWrap(e.getValue()));
814        builder.addMapEntry(bbpBuilder.build());
815      }
816      out.write(ProtobufMagic.PB_MAGIC);
817      builder.build().writeDelimitedTo(out);
818    }
819
820    /**
821     * Populate this instance with what we find on the passed in <code>in</code> stream.
822     * Can deserialize protobuf of old Writables format.
823     * @param in
824     * @throws IOException
825     * @see #write(DataOutputStream)
826     */
827    void read(final DataInputStream in) throws IOException {
828      // This code is tested over in TestHFileReaderV1 where we read an old hfile w/ this new code.
829      int pblen = ProtobufUtil.lengthOfPBMagic();
830      byte [] pbuf = new byte[pblen];
831      if (in.markSupported()) in.mark(pblen);
832      int read = in.read(pbuf);
833      if (read != pblen) throw new IOException("read=" + read + ", wanted=" + pblen);
834      if (ProtobufUtil.isPBMagicPrefix(pbuf)) {
835        parsePB(HFileProtos.FileInfoProto.parseDelimitedFrom(in));
836      } else {
837        if (in.markSupported()) {
838          in.reset();
839          parseWritable(in);
840        } else {
841          // We cannot use BufferedInputStream, it consumes more than we read from the underlying IS
842          ByteArrayInputStream bais = new ByteArrayInputStream(pbuf);
843          SequenceInputStream sis = new SequenceInputStream(bais, in); // Concatenate input streams
844          // TODO: Am I leaking anything here wrapping the passed in stream?  We are not calling close on the wrapped
845          // streams but they should be let go after we leave this context?  I see that we keep a reference to the
846          // passed in inputstream but since we no longer have a reference to this after we leave, we should be ok.
847          parseWritable(new DataInputStream(sis));
848        }
849      }
850    }
851
852    /** Now parse the old Writable format.  It was a list of Map entries.  Each map entry was a key and a value of
853     * a byte [].  The old map format had a byte before each entry that held a code which was short for the key or
854     * value type.  We know it was a byte [] so in below we just read and dump it.
855     * @throws IOException
856     */
857    void parseWritable(final DataInputStream in) throws IOException {
858      // First clear the map.  Otherwise we will just accumulate entries every time this method is called.
859      this.map.clear();
860      // Read the number of entries in the map
861      int entries = in.readInt();
862      // Then read each key/value pair
863      for (int i = 0; i < entries; i++) {
864        byte [] key = Bytes.readByteArray(in);
865        // We used to read a byte that encoded the class type.  Read and ignore it because it is always byte [] in hfile
866        in.readByte();
867        byte [] value = Bytes.readByteArray(in);
868        this.map.put(key, value);
869      }
870    }
871
872    /**
873     * Fill our map with content of the pb we read off disk
874     * @param fip protobuf message to read
875     */
876    void parsePB(final HFileProtos.FileInfoProto fip) {
877      this.map.clear();
878      for (BytesBytesPair pair: fip.getMapEntryList()) {
879        this.map.put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
880      }
881    }
882  }
883
884  /** Return true if the given file info key is reserved for internal use. */
885  public static boolean isReservedFileInfoKey(byte[] key) {
886    return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
887  }
888
889  /**
890   * Get names of supported compression algorithms. The names are acceptable by
891   * HFile.Writer.
892   *
893   * @return Array of strings, each represents a supported compression
894   *         algorithm. Currently, the following compression algorithms are
895   *         supported.
896   *         <ul>
897   *         <li>"none" - No compression.
898   *         <li>"gz" - GZIP compression.
899   *         </ul>
900   */
901  public static String[] getSupportedCompressionAlgorithms() {
902    return Compression.getSupportedAlgorithms();
903  }
904
905  // Utility methods.
906  /*
907   * @param l Long to convert to an int.
908   * @return <code>l</code> cast as an int.
909   */
910  static int longToInt(final long l) {
911    // Expecting the size() of a block not exceeding 4GB. Assuming the
912    // size() will wrap to negative integer if it exceeds 2GB (From tfile).
913    return (int)(l & 0x00000000ffffffffL);
914  }
915
916  /**
917   * Returns all HFiles belonging to the given region directory. Could return an
918   * empty list.
919   *
920   * @param fs  The file system reference.
921   * @param regionDir  The region directory to scan.
922   * @return The list of files found.
923   * @throws IOException When scanning the files fails.
924   */
925  static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
926      throws IOException {
927    List<Path> regionHFiles = new ArrayList<>();
928    PathFilter dirFilter = new FSUtils.DirFilter(fs);
929    FileStatus[] familyDirs = fs.listStatus(regionDir, dirFilter);
930    for(FileStatus dir : familyDirs) {
931      FileStatus[] files = fs.listStatus(dir.getPath());
932      for (FileStatus file : files) {
933        if (!file.isDirectory() &&
934            (!file.getPath().toString().contains(HConstants.HREGION_OLDLOGDIR_NAME)) &&
935            (!file.getPath().toString().contains(HConstants.RECOVERED_EDITS_DIR))) {
936          regionHFiles.add(file.getPath());
937        }
938      }
939    }
940    return regionHFiles;
941  }
942
943  /**
944   * Checks the given {@link HFile} format version, and throws an exception if
945   * invalid. Note that if the version number comes from an input file and has
946   * not been verified, the caller needs to re-throw an {@link IOException} to
947   * indicate that this is not a software error, but corrupted input.
948   *
949   * @param version an HFile version
950   * @throws IllegalArgumentException if the version is invalid
951   */
952  public static void checkFormatVersion(int version)
953      throws IllegalArgumentException {
954    if (version < MIN_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
955      throw new IllegalArgumentException("Invalid HFile version: " + version
956          + " (expected to be " + "between " + MIN_FORMAT_VERSION + " and "
957          + MAX_FORMAT_VERSION + ")");
958    }
959  }
960
961
962  public static void checkHFileVersion(final Configuration c) {
963    int version = c.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION);
964    if (version < MAX_FORMAT_VERSION || version > MAX_FORMAT_VERSION) {
965      throw new IllegalArgumentException("The setting for " + FORMAT_VERSION_KEY +
966        " (in your hbase-*.xml files) is " + version + " which does not match " +
967        MAX_FORMAT_VERSION +
968        "; are you running with a configuration from an older or newer hbase install (an " +
969        "incompatible hbase-default.xml or hbase-site.xml on your CLASSPATH)?");
970    }
971  }
972
973  public static void main(String[] args) throws Exception {
974    // delegate to preserve old behavior
975    HFilePrettyPrinter.main(args);
976  }
977}