001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.regionserver;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.text.DecimalFormat;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Locale;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.KeyValue;
036import org.apache.hadoop.hbase.KeyValueUtil;
037import org.apache.hadoop.hbase.io.compress.Compression;
038import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
039import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
040import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
041import org.apache.hadoop.hbase.io.encoding.EncodedDataBlock;
042import org.apache.hadoop.hbase.io.hfile.CacheConfig;
043import org.apache.hadoop.hbase.io.hfile.HFileBlock;
044import org.apache.hadoop.hbase.io.hfile.HFileContext;
045import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
046import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.io.WritableUtils;
049import org.apache.hadoop.io.compress.CompressionOutputStream;
050import org.apache.hadoop.io.compress.Compressor;
051import org.apache.hadoop.io.compress.Decompressor;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
055import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
056import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
057import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
058import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
059import org.apache.hbase.thirdparty.org.apache.commons.cli.PosixParser;
060
061/**
062 * Tests various algorithms for key compression on an existing HFile. Useful
063 * for testing, debugging and benchmarking.
064 */
065public class DataBlockEncodingTool {
066  private static final Logger LOG = LoggerFactory.getLogger(
067      DataBlockEncodingTool.class);
068
069  private static final boolean includesMemstoreTS = true;
070
071  /**
072   * How many times to run the benchmark. More times means better data in terms
073   * of statistics but slower execution. Has to be strictly larger than
074   * {@link #DEFAULT_BENCHMARK_N_OMIT}.
075   */
076  private static final int DEFAULT_BENCHMARK_N_TIMES = 12;
077
078  /**
079   * How many first runs should not be included in the benchmark. Done in order
080   * to exclude setup cost.
081   */
082  private static final int DEFAULT_BENCHMARK_N_OMIT = 2;
083
084  /** HFile name to be used in benchmark */
085  private static final String OPT_HFILE_NAME = "f";
086
087  /** Maximum number of key/value pairs to process in a single benchmark run */
088  private static final String OPT_KV_LIMIT = "n";
089
090  /** Whether to run a benchmark to measure read throughput */
091  private static final String OPT_MEASURE_THROUGHPUT = "b";
092
093  /** If this is specified, no correctness testing will be done */
094  private static final String OPT_OMIT_CORRECTNESS_TEST = "c";
095
096  /** What compression algorithm to test */
097  private static final String OPT_COMPRESSION_ALGORITHM = "a";
098
099  /** Number of times to run each benchmark */
100  private static final String OPT_BENCHMARK_N_TIMES = "t";
101
102  /** Number of first runs of every benchmark to omit from statistics */
103  private static final String OPT_BENCHMARK_N_OMIT = "omit";
104
105  /** Compression algorithm to use if not specified on the command line */
106  private static final Algorithm DEFAULT_COMPRESSION =
107      Compression.Algorithm.GZ;
108
109  private static final DecimalFormat DELIMITED_DECIMAL_FORMAT =
110      new DecimalFormat();
111
112  static {
113    DELIMITED_DECIMAL_FORMAT.setGroupingSize(3);
114  }
115
116  private static final String PCT_FORMAT = "%.2f %%";
117  private static final String INT_FORMAT = "%d";
118
119  private static int benchmarkNTimes = DEFAULT_BENCHMARK_N_TIMES;
120  private static int benchmarkNOmit = DEFAULT_BENCHMARK_N_OMIT;
121
122  private List<EncodedDataBlock> codecs = new ArrayList<>();
123  private long totalPrefixLength = 0;
124  private long totalKeyLength = 0;
125  private long totalValueLength = 0;
126  private long totalKeyRedundancyLength = 0;
127  private long totalCFLength = 0;
128
129  private byte[] rawKVs;
130  private boolean useHBaseChecksum = false;
131
132  private final String compressionAlgorithmName;
133  private final Algorithm compressionAlgorithm;
134  private final Compressor compressor;
135  private final Decompressor decompressor;
136
137  // Check if HFile use Tag.
138  private static boolean USE_TAG = false;
139
140  private enum Manipulation {
141    ENCODING,
142    DECODING,
143    COMPRESSION,
144    DECOMPRESSION;
145
146    @Override
147    public String toString() {
148      String s = super.toString();
149      StringBuilder sb = new StringBuilder();
150      sb.append(s.charAt(0));
151      sb.append(s.substring(1).toLowerCase(Locale.ROOT));
152      return sb.toString();
153    }
154  }
155
156  /**
157   * @param compressionAlgorithmName What kind of algorithm should be used
158   *                                 as baseline for comparison (e.g. lzo, gz).
159   */
160  public DataBlockEncodingTool(String compressionAlgorithmName) {
161    this.compressionAlgorithmName = compressionAlgorithmName;
162    this.compressionAlgorithm = Compression.getCompressionAlgorithmByName(
163        compressionAlgorithmName);
164    this.compressor = this.compressionAlgorithm.getCompressor();
165    this.decompressor = this.compressionAlgorithm.getDecompressor();
166  }
167
168  /**
169   * Check statistics for given HFile for different data block encoders.
170   * @param scanner Of file which will be compressed.
171   * @param kvLimit Maximal count of KeyValue which will be processed.
172   * @throws IOException thrown if scanner is invalid
173   */
174  public void checkStatistics(final KeyValueScanner scanner, final int kvLimit)
175      throws IOException {
176    scanner.seek(KeyValue.LOWESTKEY);
177
178    KeyValue currentKV;
179
180    byte[] previousKey = null;
181    byte[] currentKey;
182
183    DataBlockEncoding[] encodings = DataBlockEncoding.values();
184
185    ByteArrayOutputStream uncompressedOutputStream =
186        new ByteArrayOutputStream();
187
188    int j = 0;
189    while ((currentKV = KeyValueUtil.ensureKeyValue(scanner.next())) != null && j < kvLimit) {
190      // Iterates through key/value pairs
191      j++;
192      currentKey = currentKV.getKey();
193      if (previousKey != null) {
194        for (int i = 0; i < previousKey.length && i < currentKey.length &&
195            previousKey[i] == currentKey[i]; ++i) {
196          totalKeyRedundancyLength++;
197        }
198      }
199
200      // Add tagsLen zero to cells don't include tags. Since the process of
201      // scanner converts byte array to KV would abandon tagsLen part if tagsLen
202      // is zero. But we still needs the tagsLen part to check if current cell
203      // include tags. If USE_TAG is true, HFile contains cells with tags,
204      // if the cell tagsLen equals 0, it means other cells may have tags.
205      if (USE_TAG && currentKV.getTagsLength() == 0) {
206        uncompressedOutputStream.write(currentKV.getBuffer(),
207            currentKV.getOffset(), currentKV.getLength());
208        // write tagsLen = 0.
209        uncompressedOutputStream.write(Bytes.toBytes((short) 0));
210      } else {
211        uncompressedOutputStream.write(currentKV.getBuffer(),
212            currentKV.getOffset(), currentKV.getLength());
213      }
214
215      if(includesMemstoreTS) {
216        WritableUtils.writeVLong(
217            new DataOutputStream(uncompressedOutputStream), currentKV.getSequenceId());
218      }
219
220      previousKey = currentKey;
221
222      int kLen = currentKV.getKeyLength();
223      int vLen = currentKV.getValueLength();
224      int cfLen = currentKV.getFamilyLength(currentKV.getFamilyOffset());
225      int restLen = currentKV.getLength() - kLen - vLen;
226
227      totalKeyLength += kLen;
228      totalValueLength += vLen;
229      totalPrefixLength += restLen;
230      totalCFLength += cfLen;
231    }
232
233    rawKVs = uncompressedOutputStream.toByteArray();
234    for (DataBlockEncoding encoding : encodings) {
235      if (encoding == DataBlockEncoding.NONE) {
236        continue;
237      }
238      DataBlockEncoder d = encoding.getEncoder();
239      HFileContext meta = new HFileContextBuilder()
240          .withDataBlockEncoding(encoding)
241          .withCompression(Compression.Algorithm.NONE)
242          .withIncludesMvcc(includesMemstoreTS)
243          .withIncludesTags(USE_TAG).build();
244      codecs.add(new EncodedDataBlock(d, encoding, rawKVs, meta ));
245    }
246  }
247
248  /**
249   * Verify if all data block encoders are working properly.
250   *
251   * @param scanner Of file which was compressed.
252   * @param kvLimit Maximal count of KeyValue which will be processed.
253   * @return true if all data block encoders compressed/decompressed correctly.
254   * @throws IOException thrown if scanner is invalid
255   */
256  public boolean verifyCodecs(final KeyValueScanner scanner, final int kvLimit)
257      throws IOException {
258    KeyValue currentKv;
259
260    scanner.seek(KeyValue.LOWESTKEY);
261    List<Iterator<Cell>> codecIterators = new ArrayList<>();
262    for(EncodedDataBlock codec : codecs) {
263      codecIterators.add(codec.getIterator(HFileBlock.headerSize(useHBaseChecksum)));
264    }
265
266    int j = 0;
267    while ((currentKv = KeyValueUtil.ensureKeyValue(scanner.next())) != null && j < kvLimit) {
268      // Iterates through key/value pairs
269      ++j;
270      for (Iterator<Cell> it : codecIterators) {
271        Cell c = it.next();
272        KeyValue codecKv = KeyValueUtil.ensureKeyValue(c);
273        if (codecKv == null || 0 != Bytes.compareTo(
274            codecKv.getBuffer(), codecKv.getOffset(), codecKv.getLength(),
275            currentKv.getBuffer(), currentKv.getOffset(),
276            currentKv.getLength())) {
277          if (codecKv == null) {
278            LOG.error("There is a bug in codec " + it +
279                " it returned null KeyValue,");
280          } else {
281            int prefix = 0;
282            int limitLength = 2 * Bytes.SIZEOF_INT +
283                Math.min(codecKv.getLength(), currentKv.getLength());
284            while (prefix < limitLength &&
285                codecKv.getBuffer()[prefix + codecKv.getOffset()] ==
286                currentKv.getBuffer()[prefix + currentKv.getOffset()]) {
287              prefix++;
288            }
289
290            LOG.error("There is bug in codec " + it.toString() +
291                "\n on element " + j +
292                "\n codecKv.getKeyLength() " + codecKv.getKeyLength() +
293                "\n codecKv.getValueLength() " + codecKv.getValueLength() +
294                "\n codecKv.getLength() " + codecKv.getLength() +
295                "\n currentKv.getKeyLength() " + currentKv.getKeyLength() +
296                "\n currentKv.getValueLength() " + currentKv.getValueLength() +
297                "\n codecKv.getLength() " + currentKv.getLength() +
298                "\n currentKV rowLength " + currentKv.getRowLength() +
299                " familyName " + currentKv.getFamilyLength() +
300                " qualifier " + currentKv.getQualifierLength() +
301                "\n prefix " + prefix +
302                "\n codecKv   '" + Bytes.toStringBinary(codecKv.getBuffer(),
303                    codecKv.getOffset(), prefix) + "' diff '" +
304                    Bytes.toStringBinary(codecKv.getBuffer(),
305                        codecKv.getOffset() + prefix, codecKv.getLength() -
306                        prefix) + "'" +
307                "\n currentKv '" + Bytes.toStringBinary(
308                   currentKv.getBuffer(),
309                   currentKv.getOffset(), prefix) + "' diff '" +
310                   Bytes.toStringBinary(currentKv.getBuffer(),
311                       currentKv.getOffset() + prefix, currentKv.getLength() -
312                       prefix) + "'"
313                );
314          }
315          return false;
316        }
317      }
318    }
319
320    LOG.info("Verification was successful!");
321
322    return true;
323  }
324
325  /**
326   * Benchmark codec's speed.
327   */
328  public void benchmarkCodecs() throws IOException {
329    LOG.info("Starting a throughput benchmark for data block encoding codecs");
330    int prevTotalSize = -1;
331    for (EncodedDataBlock codec : codecs) {
332      prevTotalSize = benchmarkEncoder(prevTotalSize, codec);
333    }
334
335    benchmarkDefaultCompression(prevTotalSize, rawKVs);
336  }
337
338  /**
339   * Benchmark compression/decompression throughput.
340   * @param previousTotalSize Total size used for verification. Use -1 if
341   *          unknown.
342   * @param codec Tested encoder.
343   * @return Size of uncompressed data.
344   */
345  private int benchmarkEncoder(int previousTotalSize, EncodedDataBlock codec) {
346    int prevTotalSize = previousTotalSize;
347    int totalSize = 0;
348
349    // decompression time
350    List<Long> durations = new ArrayList<>();
351    for (int itTime = 0; itTime < benchmarkNTimes; ++itTime) {
352      totalSize = 0;
353
354      Iterator<Cell> it;
355
356      it = codec.getIterator(HFileBlock.headerSize(useHBaseChecksum));
357
358      // count only the algorithm time, without memory allocations
359      // (expect first time)
360      final long startTime = System.nanoTime();
361      while (it.hasNext()) {
362        totalSize += KeyValueUtil.ensureKeyValue(it.next()).getLength();
363      }
364      final long finishTime = System.nanoTime();
365      if (itTime >= benchmarkNOmit) {
366        durations.add(finishTime - startTime);
367      }
368
369      if (prevTotalSize != -1 && prevTotalSize != totalSize) {
370        throw new IllegalStateException(String.format(
371            "Algorithm '%s' decoded data to different size", codec.toString()));
372      }
373      prevTotalSize = totalSize;
374    }
375
376    List<Long> encodingDurations = new ArrayList<>();
377    for (int itTime = 0; itTime < benchmarkNTimes; ++itTime) {
378      final long startTime = System.nanoTime();
379      codec.encodeData();
380      final long finishTime = System.nanoTime();
381      if (itTime >= benchmarkNOmit) {
382        encodingDurations.add(finishTime - startTime);
383      }
384    }
385
386    System.out.println(codec.toString() + ":");
387    printBenchmarkResult(totalSize, encodingDurations, Manipulation.ENCODING);
388    printBenchmarkResult(totalSize, durations, Manipulation.DECODING);
389    System.out.println();
390
391    return prevTotalSize;
392  }
393
394  private void benchmarkDefaultCompression(int totalSize, byte[] rawBuffer)
395      throws IOException {
396    benchmarkAlgorithm(compressionAlgorithm,
397        compressionAlgorithmName.toUpperCase(Locale.ROOT), rawBuffer, 0, totalSize);
398  }
399
400  /**
401   * Check decompress performance of a given algorithm and print it.
402   * @param algorithm Compression algorithm.
403   * @param name Name of algorithm.
404   * @param buffer Buffer to be compressed.
405   * @param offset Position of the beginning of the data.
406   * @param length Length of data in buffer.
407   * @throws IOException
408   */
409  public void benchmarkAlgorithm(Compression.Algorithm algorithm, String name,
410      byte[] buffer, int offset, int length) throws IOException {
411    System.out.println(name + ":");
412
413    // compress it
414    List<Long> compressDurations = new ArrayList<>();
415    ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
416    CompressionOutputStream compressingStream =
417        algorithm.createPlainCompressionStream(compressedStream, compressor);
418    try {
419      for (int itTime = 0; itTime < benchmarkNTimes; ++itTime) {
420        final long startTime = System.nanoTime();
421        // The compressedStream should reset before compressingStream resetState since in GZ
422        // resetStatue will write header in the outputstream.
423        compressedStream.reset();
424        compressingStream.resetState();
425        compressingStream.write(buffer, offset, length);
426        compressingStream.flush();
427        compressedStream.toByteArray();
428
429        final long finishTime = System.nanoTime();
430
431        // add time record
432        if (itTime >= benchmarkNOmit) {
433          compressDurations.add(finishTime - startTime);
434        }
435      }
436    } catch (IOException e) {
437      throw new RuntimeException(String.format(
438          "Benchmark, or encoding algorithm '%s' cause some stream problems",
439          name), e);
440    }
441    compressingStream.close();
442    printBenchmarkResult(length, compressDurations, Manipulation.COMPRESSION);
443
444    byte[] compBuffer = compressedStream.toByteArray();
445
446    // uncompress it several times and measure performance
447    List<Long> durations = new ArrayList<>();
448    for (int itTime = 0; itTime < benchmarkNTimes; ++itTime) {
449      final long startTime = System.nanoTime();
450      byte[] newBuf = new byte[length + 1];
451
452      try {
453        ByteArrayInputStream downStream = new ByteArrayInputStream(compBuffer,
454            0, compBuffer.length);
455        InputStream decompressedStream = algorithm.createDecompressionStream(
456            downStream, decompressor, 0);
457
458        int destOffset = 0;
459        int nextChunk;
460        while ((nextChunk = decompressedStream.available()) > 0) {
461          destOffset += decompressedStream.read(newBuf, destOffset, nextChunk);
462        }
463        decompressedStream.close();
464
465      } catch (IOException e) {
466        throw new RuntimeException(String.format(
467            "Decoding path in '%s' algorithm cause exception ", name), e);
468      }
469
470      final long finishTime = System.nanoTime();
471
472      // check correctness
473      if (0 != Bytes.compareTo(buffer, 0, length, newBuf, 0, length)) {
474        int prefix = 0;
475        for(; prefix < buffer.length && prefix < newBuf.length; ++prefix) {
476          if (buffer[prefix] != newBuf[prefix]) {
477            break;
478          }
479        }
480        throw new RuntimeException(String.format(
481            "Algorithm '%s' is corrupting the data", name));
482      }
483
484      // add time record
485      if (itTime >= benchmarkNOmit) {
486        durations.add(finishTime - startTime);
487      }
488    }
489    printBenchmarkResult(length, durations, Manipulation.DECOMPRESSION);
490    System.out.println();
491  }
492
493  private static final double BYTES_IN_MB = 1024 * 1024.0;
494  private static final double NS_IN_SEC = 1000.0 * 1000.0 * 1000.0;
495  private static final double MB_SEC_COEF = NS_IN_SEC / BYTES_IN_MB;
496
497  private static void printBenchmarkResult(int totalSize,
498      List<Long> durationsInNanoSec, Manipulation manipulation) {
499    final int n = durationsInNanoSec.size();
500    long meanTime = 0;
501    for (long time : durationsInNanoSec) {
502      meanTime += time;
503    }
504    meanTime /= n;
505
506    double meanMBPerSec = totalSize * MB_SEC_COEF / meanTime;
507    double mbPerSecSTD = 0;
508    if (n > 0) {
509      for (long time : durationsInNanoSec) {
510        double mbPerSec = totalSize * MB_SEC_COEF / time;
511        double dev = mbPerSec - meanMBPerSec;
512        mbPerSecSTD += dev * dev;
513      }
514      mbPerSecSTD = Math.sqrt(mbPerSecSTD / n);
515    }
516
517    outputTuple(manipulation + " performance", "%6.2f MB/s (+/- %.2f MB/s)",
518         meanMBPerSec, mbPerSecSTD);
519  }
520
521  private static void outputTuple(String caption, String format,
522      Object... values) {
523    if (format.startsWith(INT_FORMAT)) {
524      format = "%s" + format.substring(INT_FORMAT.length());
525      values[0] = DELIMITED_DECIMAL_FORMAT.format(values[0]);
526    }
527
528    StringBuilder sb = new StringBuilder();
529    sb.append("  ");
530    sb.append(caption);
531    sb.append(":");
532
533    String v = String.format(format, values);
534    int padding = 60 - sb.length() - v.length();
535    for (int i = 0; i < padding; ++i) {
536      sb.append(' ');
537    }
538    sb.append(v);
539    System.out.println(sb);
540  }
541
542  /**
543   * Display statistics of different compression algorithms.
544   * @throws IOException
545   */
546  public void displayStatistics() throws IOException {
547    final String comprAlgo = compressionAlgorithmName.toUpperCase(Locale.ROOT);
548    long rawBytes = totalKeyLength + totalPrefixLength + totalValueLength;
549
550    System.out.println("Raw data size:");
551    outputTuple("Raw bytes", INT_FORMAT, rawBytes);
552    outputTuplePct("Key bytes", totalKeyLength);
553    outputTuplePct("Value bytes", totalValueLength);
554    outputTuplePct("KV infrastructure", totalPrefixLength);
555    outputTuplePct("CF overhead", totalCFLength);
556    outputTuplePct("Total key redundancy", totalKeyRedundancyLength);
557
558    int compressedSize = EncodedDataBlock.getCompressedSize(
559        compressionAlgorithm, compressor, rawKVs, 0, rawKVs.length);
560    outputTuple(comprAlgo + " only size", INT_FORMAT,
561        compressedSize);
562    outputSavings(comprAlgo + " only", compressedSize, rawBytes);
563    System.out.println();
564
565    for (EncodedDataBlock codec : codecs) {
566      System.out.println(codec.toString());
567      long encodedBytes = codec.getSize();
568      outputTuple("Encoded bytes", INT_FORMAT, encodedBytes);
569      outputSavings("Key encoding", encodedBytes - totalValueLength,
570          rawBytes - totalValueLength);
571      outputSavings("Total encoding", encodedBytes, rawBytes);
572
573      int encodedCompressedSize = codec.getEncodedCompressedSize(
574          compressionAlgorithm, compressor);
575      outputTuple("Encoding + " + comprAlgo + " size", INT_FORMAT,
576          encodedCompressedSize);
577      outputSavings("Encoding + " + comprAlgo, encodedCompressedSize, rawBytes);
578      outputSavings("Encoding with " + comprAlgo, encodedCompressedSize,
579          compressedSize);
580
581      System.out.println();
582    }
583  }
584
585  private void outputTuplePct(String caption, long size) {
586    outputTuple(caption, INT_FORMAT + " (" + PCT_FORMAT + ")",
587        size, size * 100.0 / rawKVs.length);
588  }
589
590  private void outputSavings(String caption, long part, long whole) {
591    double pct = 100.0 * (1 - 1.0 * part / whole);
592    double times = whole * 1.0 / part;
593    outputTuple(caption + " savings", PCT_FORMAT + " (%.2f x)",
594        pct, times);
595  }
596
597  /**
598   * Test a data block encoder on the given HFile. Output results to console.
599   * @param kvLimit The limit of KeyValue which will be analyzed.
600   * @param hfilePath an HFile path on the file system.
601   * @param compressionName Compression algorithm used for comparison.
602   * @param doBenchmark Run performance benchmarks.
603   * @param doVerify Verify correctness.
604   * @throws IOException When pathName is incorrect.
605   */
606  public static void testCodecs(Configuration conf, int kvLimit,
607      String hfilePath, String compressionName, boolean doBenchmark,
608      boolean doVerify) throws IOException {
609    // create environment
610    Path path = new Path(hfilePath);
611    CacheConfig cacheConf = new CacheConfig(conf);
612    FileSystem fs = FileSystem.get(conf);
613    HStoreFile hsf = new HStoreFile(fs, path, conf, cacheConf, BloomType.NONE, true);
614    hsf.initReader();
615    StoreFileReader reader = hsf.getReader();
616    reader.loadFileInfo();
617    KeyValueScanner scanner = reader.getStoreFileScanner(true, true,
618        false, hsf.getMaxMemStoreTS(), 0, false);
619    USE_TAG = reader.getHFileReader().getFileContext().isIncludesTags();
620    // run the utilities
621    DataBlockEncodingTool comp = new DataBlockEncodingTool(compressionName);
622    int majorVersion = reader.getHFileVersion();
623    comp.useHBaseChecksum = majorVersion > 2 ||
624      (majorVersion == 2 &&
625       reader.getHFileMinorVersion() >= HFileReaderImpl.MINOR_VERSION_WITH_CHECKSUM);
626    comp.checkStatistics(scanner, kvLimit);
627    if (doVerify) {
628      comp.verifyCodecs(scanner, kvLimit);
629    }
630    if (doBenchmark) {
631      comp.benchmarkCodecs();
632    }
633    comp.displayStatistics();
634
635    // cleanup
636    scanner.close();
637    reader.close(cacheConf.shouldEvictOnClose());
638  }
639
640  private static void printUsage(Options options) {
641    System.err.println("Usage:");
642    System.err.println(String.format("./hbase %s <options>",
643        DataBlockEncodingTool.class.getName()));
644    System.err.println("Options:");
645    for (Object it : options.getOptions()) {
646      Option opt = (Option) it;
647      if (opt.hasArg()) {
648        System.err.println(String.format("-%s %s: %s", opt.getOpt(),
649            opt.getArgName(), opt.getDescription()));
650      } else {
651        System.err.println(String.format("-%s: %s", opt.getOpt(),
652            opt.getDescription()));
653      }
654    }
655  }
656
657  /**
658   * A command line interface to benchmarks. Parses command-line arguments and
659   * runs the appropriate benchmarks.
660   * @param args Should have length at least 1 and holds the file path to HFile.
661   * @throws IOException If you specified the wrong file.
662   */
663  public static void main(final String[] args) throws IOException {
664    // set up user arguments
665    Options options = new Options();
666    options.addOption(OPT_HFILE_NAME, true, "HFile to analyse (REQUIRED)");
667    options.getOption(OPT_HFILE_NAME).setArgName("FILENAME");
668    options.addOption(OPT_KV_LIMIT, true,
669        "Maximum number of KeyValues to process. A benchmark stops running " +
670        "after iterating over this many KV pairs.");
671    options.getOption(OPT_KV_LIMIT).setArgName("NUMBER");
672    options.addOption(OPT_MEASURE_THROUGHPUT, false,
673        "Measure read throughput");
674    options.addOption(OPT_OMIT_CORRECTNESS_TEST, false,
675        "Omit corectness tests.");
676    options.addOption(OPT_COMPRESSION_ALGORITHM, true,
677        "What kind of compression algorithm use for comparison.");
678    options.addOption(OPT_BENCHMARK_N_TIMES,
679        true, "Number of times to run each benchmark. Default value: " +
680            DEFAULT_BENCHMARK_N_TIMES);
681    options.addOption(OPT_BENCHMARK_N_OMIT, true,
682        "Number of first runs of every benchmark to exclude from "
683            + "statistics (" + DEFAULT_BENCHMARK_N_OMIT
684            + " by default, so that " + "only the last "
685            + (DEFAULT_BENCHMARK_N_TIMES - DEFAULT_BENCHMARK_N_OMIT)
686            + " times are included in statistics.)");
687
688    // parse arguments
689    CommandLineParser parser = new PosixParser();
690    CommandLine cmd = null;
691    try {
692      cmd = parser.parse(options, args);
693    } catch (ParseException e) {
694      System.err.println("Could not parse arguments!");
695      System.exit(-1);
696      return; // avoid warning
697    }
698
699    int kvLimit = Integer.MAX_VALUE;
700    if (cmd.hasOption(OPT_KV_LIMIT)) {
701      kvLimit = Integer.parseInt(cmd.getOptionValue(OPT_KV_LIMIT));
702      if (kvLimit <= 0) {
703        LOG.error("KV_LIMIT should not less than 1.");
704      }
705    }
706
707    // basic argument sanity checks
708    if (!cmd.hasOption(OPT_HFILE_NAME)) {
709      LOG.error("Please specify HFile name using the " + OPT_HFILE_NAME
710          + " option");
711      printUsage(options);
712      System.exit(-1);
713    }
714
715    String pathName = cmd.getOptionValue(OPT_HFILE_NAME);
716    String compressionName = DEFAULT_COMPRESSION.getName();
717    if (cmd.hasOption(OPT_COMPRESSION_ALGORITHM)) {
718      compressionName =
719          cmd.getOptionValue(OPT_COMPRESSION_ALGORITHM).toLowerCase(Locale.ROOT);
720    }
721    boolean doBenchmark = cmd.hasOption(OPT_MEASURE_THROUGHPUT);
722    boolean doVerify = !cmd.hasOption(OPT_OMIT_CORRECTNESS_TEST);
723
724    if (cmd.hasOption(OPT_BENCHMARK_N_TIMES)) {
725      benchmarkNTimes = Integer.valueOf(cmd.getOptionValue(
726          OPT_BENCHMARK_N_TIMES));
727    }
728    if (cmd.hasOption(OPT_BENCHMARK_N_OMIT)) {
729      benchmarkNOmit =
730          Integer.valueOf(cmd.getOptionValue(OPT_BENCHMARK_N_OMIT));
731    }
732    if (benchmarkNTimes < benchmarkNOmit) {
733      LOG.error("The number of times to run each benchmark ("
734          + benchmarkNTimes
735          + ") must be greater than the number of benchmark runs to exclude "
736          + "from statistics (" + benchmarkNOmit + ")");
737      System.exit(1);
738    }
739    LOG.info("Running benchmark " + benchmarkNTimes + " times. " +
740        "Excluding the first " + benchmarkNOmit + " times from statistics.");
741
742    final Configuration conf = HBaseConfiguration.create();
743    try {
744      testCodecs(conf, kvLimit, pathName, compressionName, doBenchmark,
745          doVerify);
746    } finally {
747      (new CacheConfig(conf)).getBlockCache().shutdown();
748    }
749  }
750
751}