001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.Random;
023import java.util.StringTokenizer;
024import junit.framework.TestCase;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FSDataInputStream;
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.fs.RawLocalFileSystem;
031import org.apache.hadoop.hbase.CellComparatorImpl;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
036import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
037import org.apache.hadoop.hbase.testclassification.IOTests;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.io.BytesWritable;
041import org.junit.ClassRule;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
047import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
048import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
049import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
050import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
051import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionBuilder;
052import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
053import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
054
055/**
056 * test the performance for seek.
057 * <p>
058 * Copied from
059 * <a href="https://issues.apache.org/jira/browse/HADOOP-3315">hadoop-3315 tfile</a>.
060 * Remove after tfile is committed and use the tfile version of this class
061 * instead.</p>
062 */
063@Category({IOTests.class, MediumTests.class})
064public class TestHFileSeek extends TestCase {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestHFileSeek.class);
069
070  private static final byte[] CF = Bytes.toBytes("f1");
071  private static final byte[] QUAL = Bytes.toBytes("q1");
072  private static final boolean USE_PREAD = true;
073  private MyOptions options;
074  private Configuration conf;
075  private Path path;
076  private FileSystem fs;
077  private NanoTimer timer;
078  private Random rng;
079  private RandomDistribution.DiscreteRNG keyLenGen;
080  private KVGenerator kvGen;
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestHFileSeek.class);
083
084  @Override
085  public void setUp() throws IOException {
086    if (options == null) {
087      options = new MyOptions(new String[0]);
088    }
089
090    conf = new Configuration();
091
092    if (options.useRawFs) {
093      conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class);
094    }
095
096    conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
097    conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
098    path = new Path(new Path(options.rootDir), options.file);
099    fs = path.getFileSystem(conf);
100    timer = new NanoTimer(false);
101    rng = new Random(options.seed);
102    keyLenGen =
103        new RandomDistribution.Zipf(new Random(rng.nextLong()),
104            options.minKeyLen, options.maxKeyLen, 1.2);
105    RandomDistribution.DiscreteRNG valLenGen =
106        new RandomDistribution.Flat(new Random(rng.nextLong()),
107            options.minValLength, options.maxValLength);
108    RandomDistribution.DiscreteRNG wordLenGen =
109        new RandomDistribution.Flat(new Random(rng.nextLong()),
110            options.minWordLen, options.maxWordLen);
111    kvGen =
112        new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen,
113            options.dictSize);
114  }
115
116  @Override
117  public void tearDown() {
118    try {
119      fs.close();
120    }
121    catch (Exception e) {
122      // Nothing
123    }
124  }
125
126  private static FSDataOutputStream createFSOutput(Path name, FileSystem fs)
127    throws IOException {
128    if (fs.exists(name)) {
129      fs.delete(name, true);
130    }
131    FSDataOutputStream fout = fs.create(name);
132    return fout;
133  }
134
135  private void createTFile() throws IOException {
136    long totalBytes = 0;
137    FSDataOutputStream fout = createFSOutput(path, fs);
138    try {
139      HFileContext context = new HFileContextBuilder()
140                            .withBlockSize(options.minBlockSize)
141                            .withCompression(HFileWriterImpl.compressionByName(options.compress))
142                            .build();
143      Writer writer = HFile.getWriterFactoryNoCache(conf)
144          .withOutputStream(fout)
145          .withFileContext(context)
146          .withComparator(CellComparatorImpl.COMPARATOR)
147          .create();
148      try {
149        BytesWritable key = new BytesWritable();
150        BytesWritable val = new BytesWritable();
151        timer.start();
152        for (long i = 0; true; ++i) {
153          if (i % 1000 == 0) { // test the size for every 1000 rows.
154            if (fs.getFileStatus(path).getLen() >= options.fileSize) {
155              break;
156            }
157          }
158          kvGen.next(key, val, false);
159          byte [] k = new byte [key.getLength()];
160          System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
161          byte [] v = new byte [val.getLength()];
162          System.arraycopy(val.getBytes(), 0, v, 0, key.getLength());
163          KeyValue kv = new KeyValue(k, CF, QUAL, v);
164          writer.append(kv);
165          totalBytes += kv.getKeyLength();
166          totalBytes += kv.getValueLength();
167        }
168        timer.stop();
169      }
170      finally {
171        writer.close();
172      }
173    }
174    finally {
175      fout.close();
176    }
177    double duration = (double)timer.read()/1000; // in us.
178    long fsize = fs.getFileStatus(path).getLen();
179
180    System.out.printf(
181        "time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n",
182        timer.toString(), (double) totalBytes / 1024 / 1024, totalBytes
183            / duration);
184    System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n",
185        timer.toString(), (double) fsize / 1024 / 1024, fsize / duration);
186  }
187
188  public void seekTFile() throws IOException {
189    int miss = 0;
190    long totalBytes = 0;
191    FSDataInputStream fsdis = fs.open(path);
192    Reader reader = HFile.createReaderFromStream(path, fsdis,
193        fs.getFileStatus(path).getLen(), new CacheConfig(conf), conf);
194    reader.loadFileInfo();
195    KeySampler kSampler = new KeySampler(rng, ((KeyValue) reader.getFirstKey().get()).getKey(),
196        ((KeyValue) reader.getLastKey().get()).getKey(), keyLenGen);
197    HFileScanner scanner = reader.getScanner(false, USE_PREAD);
198    BytesWritable key = new BytesWritable();
199    timer.reset();
200    timer.start();
201    for (int i = 0; i < options.seekCount; ++i) {
202      kSampler.next(key);
203      byte[] k = new byte[key.getLength()];
204      System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
205      KeyValue kv = new KeyValue(k, CF, QUAL);
206      if (scanner.seekTo(kv) >= 0) {
207        ByteBuffer bbkey = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey());
208        ByteBuffer bbval = scanner.getValue();
209        totalBytes += bbkey.limit();
210        totalBytes += bbval.limit();
211      } else {
212        ++miss;
213      }
214    }
215    timer.stop();
216    System.out.printf(
217        "time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
218        timer.toString(), NanoTimer.nanoTimeToString(timer.read()
219            / options.seekCount), options.seekCount - miss, miss,
220        (double) totalBytes / 1024 / (options.seekCount - miss));
221
222  }
223
224  public void testSeeks() throws IOException {
225    if (options.doCreate()) {
226      createTFile();
227    }
228
229    if (options.doRead()) {
230      seekTFile();
231    }
232
233    if (options.doCreate()) {
234      fs.delete(path, true);
235    }
236  }
237
238  private static class IntegerRange {
239    private final int from, to;
240
241    public IntegerRange(int from, int to) {
242      this.from = from;
243      this.to = to;
244    }
245
246    public static IntegerRange parse(String s) throws ParseException {
247      StringTokenizer st = new StringTokenizer(s, " \t,");
248      if (st.countTokens() != 2) {
249        throw new ParseException("Bad integer specification: " + s);
250      }
251      int from = Integer.parseInt(st.nextToken());
252      int to = Integer.parseInt(st.nextToken());
253      return new IntegerRange(from, to);
254    }
255
256    public int from() {
257      return from;
258    }
259
260    public int to() {
261      return to;
262    }
263  }
264
265  private static class MyOptions {
266    // hard coded constants
267    int dictSize = 1000;
268    int minWordLen = 5;
269    int maxWordLen = 20;
270
271    private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
272    String rootDir =
273      TEST_UTIL.getDataTestDir("TestTFileSeek").toString();
274    String file = "TestTFileSeek";
275    // String compress = "lzo"; DISABLED
276    String compress = "none";
277    int minKeyLen = 10;
278    int maxKeyLen = 50;
279    int minValLength = 1024;
280    int maxValLength = 2 * 1024;
281    int minBlockSize = 1 * 1024 * 1024;
282    int fsOutputBufferSize = 1;
283    int fsInputBufferSize = 0;
284    // Default writing 10MB.
285    long fileSize = 10 * 1024 * 1024;
286    long seekCount = 1000;
287    long trialCount = 1;
288    long seed;
289    boolean useRawFs = false;
290
291    static final int OP_CREATE = 1;
292    static final int OP_READ = 2;
293    int op = OP_CREATE | OP_READ;
294
295    boolean proceed = false;
296
297    public MyOptions(String[] args) {
298      seed = System.nanoTime();
299
300      try {
301        Options opts = buildOptions();
302        CommandLineParser parser = new GnuParser();
303        CommandLine line = parser.parse(opts, args, true);
304        processOptions(line, opts);
305        validateOptions();
306      }
307      catch (ParseException e) {
308        System.out.println(e.getMessage());
309        System.out.println("Try \"--help\" option for details.");
310        setStopProceed();
311      }
312    }
313
314    public boolean proceed() {
315      return proceed;
316    }
317
318    private Options buildOptions() {
319      Option compress =
320          OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz|snappy]")
321              .hasArg().withDescription("compression scheme").create('c');
322
323      Option fileSize =
324          OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB")
325              .hasArg().withDescription("target size of the file (in MB).")
326              .create('s');
327
328      Option fsInputBufferSz =
329          OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
330              .hasArg().withDescription(
331                  "size of the file system input buffer (in bytes).").create(
332                  'i');
333
334      Option fsOutputBufferSize =
335          OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
336              .hasArg().withDescription(
337                  "size of the file system output buffer (in bytes).").create(
338                  'o');
339
340      Option keyLen =
341          OptionBuilder
342              .withLongOpt("key-length")
343              .withArgName("min,max")
344              .hasArg()
345              .withDescription(
346                  "the length range of the key (in bytes)")
347              .create('k');
348
349      Option valueLen =
350          OptionBuilder
351              .withLongOpt("value-length")
352              .withArgName("min,max")
353              .hasArg()
354              .withDescription(
355                  "the length range of the value (in bytes)")
356              .create('v');
357
358      Option blockSz =
359          OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
360              .withDescription("minimum block size (in KB)").create('b');
361
362      Option operation =
363          OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
364              .withDescription(
365                  "action: seek-only, create-only, seek-after-create").create(
366                  'x');
367
368      Option rootDir =
369          OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
370              .withDescription(
371                  "specify root directory where files will be created.")
372              .create('r');
373
374      Option file =
375          OptionBuilder.withLongOpt("file").withArgName("name").hasArg()
376              .withDescription("specify the file name to be created or read.")
377              .create('f');
378
379      Option seekCount =
380          OptionBuilder
381              .withLongOpt("seek")
382              .withArgName("count")
383              .hasArg()
384              .withDescription(
385                  "specify how many seek operations we perform (requires -x r or -x rw.")
386              .create('n');
387
388      Option trialCount =
389          OptionBuilder
390              .withLongOpt("trials")
391              .withArgName("n")
392              .hasArg()
393              .withDescription(
394                  "specify how many times to run the whole benchmark")
395              .create('t');
396
397      Option useRawFs =
398          OptionBuilder
399            .withLongOpt("rawfs")
400            .withDescription("use raw instead of checksummed file system")
401            .create();
402
403      Option help =
404          OptionBuilder.withLongOpt("help").hasArg(false).withDescription(
405              "show this screen").create("h");
406
407      return new Options().addOption(compress).addOption(fileSize).addOption(
408          fsInputBufferSz).addOption(fsOutputBufferSize).addOption(keyLen)
409          .addOption(blockSz).addOption(rootDir).addOption(valueLen)
410          .addOption(operation).addOption(seekCount).addOption(file)
411          .addOption(trialCount).addOption(useRawFs).addOption(help);
412
413    }
414
415    private void processOptions(CommandLine line, Options opts)
416        throws ParseException {
417      // --help -h and --version -V must be processed first.
418      if (line.hasOption('h')) {
419        HelpFormatter formatter = new HelpFormatter();
420        System.out.println("TFile and SeqFile benchmark.");
421        System.out.println();
422        formatter.printHelp(100,
423            "java ... TestTFileSeqFileComparison [options]",
424            "\nSupported options:", opts, "");
425        return;
426      }
427
428      if (line.hasOption('c')) {
429        compress = line.getOptionValue('c');
430      }
431
432      if (line.hasOption('d')) {
433        dictSize = Integer.parseInt(line.getOptionValue('d'));
434      }
435
436      if (line.hasOption('s')) {
437        fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
438      }
439
440      if (line.hasOption('i')) {
441        fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
442      }
443
444      if (line.hasOption('o')) {
445        fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
446      }
447
448      if (line.hasOption('n')) {
449        seekCount = Integer.parseInt(line.getOptionValue('n'));
450      }
451
452      if (line.hasOption('t')) {
453        trialCount = Integer.parseInt(line.getOptionValue('t'));
454      }
455
456      if (line.hasOption('k')) {
457        IntegerRange ir = IntegerRange.parse(line.getOptionValue('k'));
458        minKeyLen = ir.from();
459        maxKeyLen = ir.to();
460      }
461
462      if (line.hasOption('v')) {
463        IntegerRange ir = IntegerRange.parse(line.getOptionValue('v'));
464        minValLength = ir.from();
465        maxValLength = ir.to();
466      }
467
468      if (line.hasOption('b')) {
469        minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
470      }
471
472      if (line.hasOption('r')) {
473        rootDir = line.getOptionValue('r');
474      }
475
476      if (line.hasOption('f')) {
477        file = line.getOptionValue('f');
478      }
479
480      if (line.hasOption('S')) {
481        seed = Long.parseLong(line.getOptionValue('S'));
482      }
483
484      if (line.hasOption('x')) {
485        String strOp = line.getOptionValue('x');
486        if (strOp.equals("r")) {
487          op = OP_READ;
488        }
489        else if (strOp.equals("w")) {
490          op = OP_CREATE;
491        }
492        else if (strOp.equals("rw")) {
493          op = OP_CREATE | OP_READ;
494        }
495        else {
496          throw new ParseException("Unknown action specifier: " + strOp);
497        }
498      }
499
500      useRawFs = line.hasOption("rawfs");
501
502      proceed = true;
503    }
504
505    private void validateOptions() throws ParseException {
506      if (!compress.equals("none") && !compress.equals("lzo")
507          && !compress.equals("gz") && !compress.equals("snappy")) {
508        throw new ParseException("Unknown compression scheme: " + compress);
509      }
510
511      if (minKeyLen >= maxKeyLen) {
512        throw new ParseException(
513            "Max key length must be greater than min key length.");
514      }
515
516      if (minValLength >= maxValLength) {
517        throw new ParseException(
518            "Max value length must be greater than min value length.");
519      }
520
521      if (minWordLen >= maxWordLen) {
522        throw new ParseException(
523            "Max word length must be greater than min word length.");
524      }
525      return;
526    }
527
528    private void setStopProceed() {
529      proceed = false;
530    }
531
532    public boolean doCreate() {
533      return (op & OP_CREATE) != 0;
534    }
535
536    public boolean doRead() {
537      return (op & OP_READ) != 0;
538    }
539  }
540
541  public static void main(String[] argv) throws IOException {
542    TestHFileSeek testCase = new TestHFileSeek();
543    MyOptions options = new MyOptions(argv);
544
545    if (options.proceed == false) {
546      return;
547    }
548
549    testCase.options = options;
550    for (int i = 0; i < options.trialCount; i++) {
551      LOG.info("Beginning trial " + (i+1));
552      testCase.setUp();
553      testCase.testSeeks();
554      testCase.tearDown();
555    }
556  }
557
558}
559