001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.Random;
023import java.util.StringTokenizer;
024import junit.framework.TestCase;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FSDataInputStream;
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.fs.RawLocalFileSystem;
031import org.apache.hadoop.hbase.CellComparatorImpl;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
036import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
037import org.apache.hadoop.hbase.testclassification.IOTests;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.io.BytesWritable;
040import org.junit.ClassRule;
041import org.junit.experimental.categories.Category;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
046import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
047import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
048import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
049import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
050import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionBuilder;
051import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
052import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
053
054/**
055 * test the performance for seek.
056 * <p>
057 * Copied from
058 * <a href="https://issues.apache.org/jira/browse/HADOOP-3315">hadoop-3315 tfile</a>.
059 * Remove after tfile is committed and use the tfile version of this class
060 * instead.</p>
061 */
062@Category({IOTests.class, MediumTests.class})
063public class TestHFileSeek extends TestCase {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067      HBaseClassTestRule.forClass(TestHFileSeek.class);
068
069  private static final byte[] CF = "f1".getBytes();
070  private static final byte[] QUAL = "q1".getBytes();
071  private static final boolean USE_PREAD = true;
072  private MyOptions options;
073  private Configuration conf;
074  private Path path;
075  private FileSystem fs;
076  private NanoTimer timer;
077  private Random rng;
078  private RandomDistribution.DiscreteRNG keyLenGen;
079  private KVGenerator kvGen;
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestHFileSeek.class);
082
083  @Override
084  public void setUp() throws IOException {
085    if (options == null) {
086      options = new MyOptions(new String[0]);
087    }
088
089    conf = new Configuration();
090
091    if (options.useRawFs) {
092      conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class);
093    }
094
095    conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
096    conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
097    path = new Path(new Path(options.rootDir), options.file);
098    fs = path.getFileSystem(conf);
099    timer = new NanoTimer(false);
100    rng = new Random(options.seed);
101    keyLenGen =
102        new RandomDistribution.Zipf(new Random(rng.nextLong()),
103            options.minKeyLen, options.maxKeyLen, 1.2);
104    RandomDistribution.DiscreteRNG valLenGen =
105        new RandomDistribution.Flat(new Random(rng.nextLong()),
106            options.minValLength, options.maxValLength);
107    RandomDistribution.DiscreteRNG wordLenGen =
108        new RandomDistribution.Flat(new Random(rng.nextLong()),
109            options.minWordLen, options.maxWordLen);
110    kvGen =
111        new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen,
112            options.dictSize);
113  }
114
115  @Override
116  public void tearDown() {
117    try {
118      fs.close();
119    }
120    catch (Exception e) {
121      // Nothing
122    }
123  }
124
125  private static FSDataOutputStream createFSOutput(Path name, FileSystem fs)
126    throws IOException {
127    if (fs.exists(name)) {
128      fs.delete(name, true);
129    }
130    FSDataOutputStream fout = fs.create(name);
131    return fout;
132  }
133
134  private void createTFile() throws IOException {
135    long totalBytes = 0;
136    FSDataOutputStream fout = createFSOutput(path, fs);
137    try {
138      HFileContext context = new HFileContextBuilder()
139                            .withBlockSize(options.minBlockSize)
140                            .withCompression(HFileWriterImpl.compressionByName(options.compress))
141                            .build();
142      Writer writer = HFile.getWriterFactoryNoCache(conf)
143          .withOutputStream(fout)
144          .withFileContext(context)
145          .withComparator(CellComparatorImpl.COMPARATOR)
146          .create();
147      try {
148        BytesWritable key = new BytesWritable();
149        BytesWritable val = new BytesWritable();
150        timer.start();
151        for (long i = 0; true; ++i) {
152          if (i % 1000 == 0) { // test the size for every 1000 rows.
153            if (fs.getFileStatus(path).getLen() >= options.fileSize) {
154              break;
155            }
156          }
157          kvGen.next(key, val, false);
158          byte [] k = new byte [key.getLength()];
159          System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
160          byte [] v = new byte [val.getLength()];
161          System.arraycopy(val.getBytes(), 0, v, 0, key.getLength());
162          KeyValue kv = new KeyValue(k, CF, QUAL, v);
163          writer.append(kv);
164          totalBytes += kv.getKeyLength();
165          totalBytes += kv.getValueLength();
166        }
167        timer.stop();
168      }
169      finally {
170        writer.close();
171      }
172    }
173    finally {
174      fout.close();
175    }
176    double duration = (double)timer.read()/1000; // in us.
177    long fsize = fs.getFileStatus(path).getLen();
178
179    System.out.printf(
180        "time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n",
181        timer.toString(), (double) totalBytes / 1024 / 1024, totalBytes
182            / duration);
183    System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n",
184        timer.toString(), (double) fsize / 1024 / 1024, fsize / duration);
185  }
186
187  public void seekTFile() throws IOException {
188    int miss = 0;
189    long totalBytes = 0;
190    FSDataInputStream fsdis = fs.open(path);
191    Reader reader = HFile.createReaderFromStream(path, fsdis,
192        fs.getFileStatus(path).getLen(), new CacheConfig(conf), conf);
193    reader.loadFileInfo();
194    KeySampler kSampler = new KeySampler(rng, ((KeyValue) reader.getFirstKey().get()).getKey(),
195        ((KeyValue) reader.getLastKey().get()).getKey(), keyLenGen);
196    HFileScanner scanner = reader.getScanner(false, USE_PREAD);
197    BytesWritable key = new BytesWritable();
198    timer.reset();
199    timer.start();
200    for (int i = 0; i < options.seekCount; ++i) {
201      kSampler.next(key);
202      byte[] k = new byte[key.getLength()];
203      System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
204      KeyValue kv = new KeyValue(k, CF, QUAL);
205      if (scanner.seekTo(kv) >= 0) {
206        ByteBuffer bbkey = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey());
207        ByteBuffer bbval = scanner.getValue();
208        totalBytes += bbkey.limit();
209        totalBytes += bbval.limit();
210      } else {
211        ++miss;
212      }
213    }
214    timer.stop();
215    System.out.printf(
216        "time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
217        timer.toString(), NanoTimer.nanoTimeToString(timer.read()
218            / options.seekCount), options.seekCount - miss, miss,
219        (double) totalBytes / 1024 / (options.seekCount - miss));
220
221  }
222
223  public void testSeeks() throws IOException {
224    if (options.doCreate()) {
225      createTFile();
226    }
227
228    if (options.doRead()) {
229      seekTFile();
230    }
231
232    if (options.doCreate()) {
233      fs.delete(path, true);
234    }
235  }
236
237  private static class IntegerRange {
238    private final int from, to;
239
240    public IntegerRange(int from, int to) {
241      this.from = from;
242      this.to = to;
243    }
244
245    public static IntegerRange parse(String s) throws ParseException {
246      StringTokenizer st = new StringTokenizer(s, " \t,");
247      if (st.countTokens() != 2) {
248        throw new ParseException("Bad integer specification: " + s);
249      }
250      int from = Integer.parseInt(st.nextToken());
251      int to = Integer.parseInt(st.nextToken());
252      return new IntegerRange(from, to);
253    }
254
255    public int from() {
256      return from;
257    }
258
259    public int to() {
260      return to;
261    }
262  }
263
264  private static class MyOptions {
265    // hard coded constants
266    int dictSize = 1000;
267    int minWordLen = 5;
268    int maxWordLen = 20;
269
270    private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
271    String rootDir =
272      TEST_UTIL.getDataTestDir("TestTFileSeek").toString();
273    String file = "TestTFileSeek";
274    // String compress = "lzo"; DISABLED
275    String compress = "none";
276    int minKeyLen = 10;
277    int maxKeyLen = 50;
278    int minValLength = 1024;
279    int maxValLength = 2 * 1024;
280    int minBlockSize = 1 * 1024 * 1024;
281    int fsOutputBufferSize = 1;
282    int fsInputBufferSize = 0;
283    // Default writing 10MB.
284    long fileSize = 10 * 1024 * 1024;
285    long seekCount = 1000;
286    long trialCount = 1;
287    long seed;
288    boolean useRawFs = false;
289
290    static final int OP_CREATE = 1;
291    static final int OP_READ = 2;
292    int op = OP_CREATE | OP_READ;
293
294    boolean proceed = false;
295
296    public MyOptions(String[] args) {
297      seed = System.nanoTime();
298
299      try {
300        Options opts = buildOptions();
301        CommandLineParser parser = new GnuParser();
302        CommandLine line = parser.parse(opts, args, true);
303        processOptions(line, opts);
304        validateOptions();
305      }
306      catch (ParseException e) {
307        System.out.println(e.getMessage());
308        System.out.println("Try \"--help\" option for details.");
309        setStopProceed();
310      }
311    }
312
313    public boolean proceed() {
314      return proceed;
315    }
316
317    private Options buildOptions() {
318      Option compress =
319          OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz|snappy]")
320              .hasArg().withDescription("compression scheme").create('c');
321
322      Option fileSize =
323          OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB")
324              .hasArg().withDescription("target size of the file (in MB).")
325              .create('s');
326
327      Option fsInputBufferSz =
328          OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
329              .hasArg().withDescription(
330                  "size of the file system input buffer (in bytes).").create(
331                  'i');
332
333      Option fsOutputBufferSize =
334          OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
335              .hasArg().withDescription(
336                  "size of the file system output buffer (in bytes).").create(
337                  'o');
338
339      Option keyLen =
340          OptionBuilder
341              .withLongOpt("key-length")
342              .withArgName("min,max")
343              .hasArg()
344              .withDescription(
345                  "the length range of the key (in bytes)")
346              .create('k');
347
348      Option valueLen =
349          OptionBuilder
350              .withLongOpt("value-length")
351              .withArgName("min,max")
352              .hasArg()
353              .withDescription(
354                  "the length range of the value (in bytes)")
355              .create('v');
356
357      Option blockSz =
358          OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
359              .withDescription("minimum block size (in KB)").create('b');
360
361      Option operation =
362          OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
363              .withDescription(
364                  "action: seek-only, create-only, seek-after-create").create(
365                  'x');
366
367      Option rootDir =
368          OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
369              .withDescription(
370                  "specify root directory where files will be created.")
371              .create('r');
372
373      Option file =
374          OptionBuilder.withLongOpt("file").withArgName("name").hasArg()
375              .withDescription("specify the file name to be created or read.")
376              .create('f');
377
378      Option seekCount =
379          OptionBuilder
380              .withLongOpt("seek")
381              .withArgName("count")
382              .hasArg()
383              .withDescription(
384                  "specify how many seek operations we perform (requires -x r or -x rw.")
385              .create('n');
386
387      Option trialCount =
388          OptionBuilder
389              .withLongOpt("trials")
390              .withArgName("n")
391              .hasArg()
392              .withDescription(
393                  "specify how many times to run the whole benchmark")
394              .create('t');
395
396      Option useRawFs =
397          OptionBuilder
398            .withLongOpt("rawfs")
399            .withDescription("use raw instead of checksummed file system")
400            .create();
401
402      Option help =
403          OptionBuilder.withLongOpt("help").hasArg(false).withDescription(
404              "show this screen").create("h");
405
406      return new Options().addOption(compress).addOption(fileSize).addOption(
407          fsInputBufferSz).addOption(fsOutputBufferSize).addOption(keyLen)
408          .addOption(blockSz).addOption(rootDir).addOption(valueLen)
409          .addOption(operation).addOption(seekCount).addOption(file)
410          .addOption(trialCount).addOption(useRawFs).addOption(help);
411
412    }
413
414    private void processOptions(CommandLine line, Options opts)
415        throws ParseException {
416      // --help -h and --version -V must be processed first.
417      if (line.hasOption('h')) {
418        HelpFormatter formatter = new HelpFormatter();
419        System.out.println("TFile and SeqFile benchmark.");
420        System.out.println();
421        formatter.printHelp(100,
422            "java ... TestTFileSeqFileComparison [options]",
423            "\nSupported options:", opts, "");
424        return;
425      }
426
427      if (line.hasOption('c')) {
428        compress = line.getOptionValue('c');
429      }
430
431      if (line.hasOption('d')) {
432        dictSize = Integer.parseInt(line.getOptionValue('d'));
433      }
434
435      if (line.hasOption('s')) {
436        fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
437      }
438
439      if (line.hasOption('i')) {
440        fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
441      }
442
443      if (line.hasOption('o')) {
444        fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
445      }
446
447      if (line.hasOption('n')) {
448        seekCount = Integer.parseInt(line.getOptionValue('n'));
449      }
450
451      if (line.hasOption('t')) {
452        trialCount = Integer.parseInt(line.getOptionValue('t'));
453      }
454
455      if (line.hasOption('k')) {
456        IntegerRange ir = IntegerRange.parse(line.getOptionValue('k'));
457        minKeyLen = ir.from();
458        maxKeyLen = ir.to();
459      }
460
461      if (line.hasOption('v')) {
462        IntegerRange ir = IntegerRange.parse(line.getOptionValue('v'));
463        minValLength = ir.from();
464        maxValLength = ir.to();
465      }
466
467      if (line.hasOption('b')) {
468        minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
469      }
470
471      if (line.hasOption('r')) {
472        rootDir = line.getOptionValue('r');
473      }
474
475      if (line.hasOption('f')) {
476        file = line.getOptionValue('f');
477      }
478
479      if (line.hasOption('S')) {
480        seed = Long.parseLong(line.getOptionValue('S'));
481      }
482
483      if (line.hasOption('x')) {
484        String strOp = line.getOptionValue('x');
485        if (strOp.equals("r")) {
486          op = OP_READ;
487        }
488        else if (strOp.equals("w")) {
489          op = OP_CREATE;
490        }
491        else if (strOp.equals("rw")) {
492          op = OP_CREATE | OP_READ;
493        }
494        else {
495          throw new ParseException("Unknown action specifier: " + strOp);
496        }
497      }
498
499      useRawFs = line.hasOption("rawfs");
500
501      proceed = true;
502    }
503
504    private void validateOptions() throws ParseException {
505      if (!compress.equals("none") && !compress.equals("lzo")
506          && !compress.equals("gz") && !compress.equals("snappy")) {
507        throw new ParseException("Unknown compression scheme: " + compress);
508      }
509
510      if (minKeyLen >= maxKeyLen) {
511        throw new ParseException(
512            "Max key length must be greater than min key length.");
513      }
514
515      if (minValLength >= maxValLength) {
516        throw new ParseException(
517            "Max value length must be greater than min value length.");
518      }
519
520      if (minWordLen >= maxWordLen) {
521        throw new ParseException(
522            "Max word length must be greater than min word length.");
523      }
524      return;
525    }
526
527    private void setStopProceed() {
528      proceed = false;
529    }
530
531    public boolean doCreate() {
532      return (op & OP_CREATE) != 0;
533    }
534
535    public boolean doRead() {
536      return (op & OP_READ) != 0;
537    }
538  }
539
540  public static void main(String[] argv) throws IOException {
541    TestHFileSeek testCase = new TestHFileSeek();
542    MyOptions options = new MyOptions(argv);
543
544    if (options.proceed == false) {
545      return;
546    }
547
548    testCase.options = options;
549    for (int i = 0; i < options.trialCount; i++) {
550      LOG.info("Beginning trial " + (i+1));
551      testCase.setUp();
552      testCase.testSeeks();
553      testCase.tearDown();
554    }
555  }
556
557}
558