001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.Random;
023import java.util.StringTokenizer;
024import junit.framework.TestCase;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FSDataOutputStream;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.fs.RawLocalFileSystem;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
034import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
035import org.apache.hadoop.hbase.testclassification.IOTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.apache.hadoop.io.BytesWritable;
038import org.junit.ClassRule;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
044import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
045import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
046import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
047import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
048import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionBuilder;
049import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
050import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
051
052/**
053 * test the performance for seek.
054 * <p>
055 * Copied from
056 * <a href="https://issues.apache.org/jira/browse/HADOOP-3315">hadoop-3315 tfile</a>.
057 * Remove after tfile is committed and use the tfile version of this class
058 * instead.</p>
059 */
060@Category({IOTests.class, SmallTests.class})
061public class TestHFileSeek extends TestCase {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065      HBaseClassTestRule.forClass(TestHFileSeek.class);
066
067  private static final byte[] CF = "f1".getBytes();
068  private static final byte[] QUAL = "q1".getBytes();
069  private static final boolean USE_PREAD = true;
070  private MyOptions options;
071  private Configuration conf;
072  private Path path;
073  private FileSystem fs;
074  private NanoTimer timer;
075  private Random rng;
076  private RandomDistribution.DiscreteRNG keyLenGen;
077  private KVGenerator kvGen;
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestHFileSeek.class);
080
081  @Override
082  public void setUp() throws IOException {
083    if (options == null) {
084      options = new MyOptions(new String[0]);
085    }
086
087    conf = new Configuration();
088
089    if (options.useRawFs) {
090      conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class);
091    }
092
093    conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
094    conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
095    path = new Path(new Path(options.rootDir), options.file);
096    fs = path.getFileSystem(conf);
097    timer = new NanoTimer(false);
098    rng = new Random(options.seed);
099    keyLenGen =
100        new RandomDistribution.Zipf(new Random(rng.nextLong()),
101            options.minKeyLen, options.maxKeyLen, 1.2);
102    RandomDistribution.DiscreteRNG valLenGen =
103        new RandomDistribution.Flat(new Random(rng.nextLong()),
104            options.minValLength, options.maxValLength);
105    RandomDistribution.DiscreteRNG wordLenGen =
106        new RandomDistribution.Flat(new Random(rng.nextLong()),
107            options.minWordLen, options.maxWordLen);
108    kvGen =
109        new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen,
110            options.dictSize);
111  }
112
113  @Override
114  public void tearDown() {
115    try {
116      fs.close();
117    }
118    catch (Exception e) {
119      // Nothing
120    }
121  }
122
123  private static FSDataOutputStream createFSOutput(Path name, FileSystem fs)
124    throws IOException {
125    if (fs.exists(name)) {
126      fs.delete(name, true);
127    }
128    FSDataOutputStream fout = fs.create(name);
129    return fout;
130  }
131
132  private void createTFile() throws IOException {
133    long totalBytes = 0;
134    FSDataOutputStream fout = createFSOutput(path, fs);
135    try {
136      HFileContext context = new HFileContextBuilder()
137                            .withBlockSize(options.minBlockSize)
138                            .withCompression(HFileWriterImpl.compressionByName(options.compress))
139                            .build();
140      Writer writer = HFile.getWriterFactoryNoCache(conf)
141          .withOutputStream(fout)
142          .withFileContext(context)
143          .create();
144      try {
145        BytesWritable key = new BytesWritable();
146        BytesWritable val = new BytesWritable();
147        timer.start();
148        for (long i = 0; true; ++i) {
149          if (i % 1000 == 0) { // test the size for every 1000 rows.
150            if (fs.getFileStatus(path).getLen() >= options.fileSize) {
151              break;
152            }
153          }
154          kvGen.next(key, val, false);
155          byte [] k = new byte [key.getLength()];
156          System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
157          byte [] v = new byte [val.getLength()];
158          System.arraycopy(val.getBytes(), 0, v, 0, key.getLength());
159          KeyValue kv = new KeyValue(k, CF, QUAL, v);
160          writer.append(kv);
161          totalBytes += kv.getKeyLength();
162          totalBytes += kv.getValueLength();
163        }
164        timer.stop();
165      }
166      finally {
167        writer.close();
168      }
169    }
170    finally {
171      fout.close();
172    }
173    double duration = (double)timer.read()/1000; // in us.
174    long fsize = fs.getFileStatus(path).getLen();
175
176    System.out.printf(
177        "time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n",
178        timer.toString(), (double) totalBytes / 1024 / 1024, totalBytes
179            / duration);
180    System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n",
181        timer.toString(), (double) fsize / 1024 / 1024, fsize / duration);
182  }
183
184  public void seekTFile() throws IOException {
185    int miss = 0;
186    long totalBytes = 0;
187    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, path).build();
188    Reader reader = TestHFile.createReaderFromStream(context, new CacheConfig(conf), conf);
189    KeySampler kSampler = new KeySampler(rng, ((KeyValue) reader.getFirstKey().get()).getKey(),
190        ((KeyValue) reader.getLastKey().get()).getKey(), keyLenGen);
191    HFileScanner scanner = reader.getScanner(false, USE_PREAD);
192    BytesWritable key = new BytesWritable();
193    timer.reset();
194    timer.start();
195    for (int i = 0; i < options.seekCount; ++i) {
196      kSampler.next(key);
197      byte[] k = new byte[key.getLength()];
198      System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
199      KeyValue kv = new KeyValue(k, CF, QUAL);
200      if (scanner.seekTo(kv) >= 0) {
201        ByteBuffer bbkey = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey());
202        ByteBuffer bbval = scanner.getValue();
203        totalBytes += bbkey.limit();
204        totalBytes += bbval.limit();
205      } else {
206        ++miss;
207      }
208    }
209    timer.stop();
210    System.out.printf(
211        "time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
212        timer.toString(), NanoTimer.nanoTimeToString(timer.read()
213            / options.seekCount), options.seekCount - miss, miss,
214        (double) totalBytes / 1024 / (options.seekCount - miss));
215
216  }
217
218  public void testSeeks() throws IOException {
219    if (options.doCreate()) {
220      createTFile();
221    }
222
223    if (options.doRead()) {
224      seekTFile();
225    }
226
227    if (options.doCreate()) {
228      fs.delete(path, true);
229    }
230  }
231
232  private static class IntegerRange {
233    private final int from, to;
234
235    public IntegerRange(int from, int to) {
236      this.from = from;
237      this.to = to;
238    }
239
240    public static IntegerRange parse(String s) throws ParseException {
241      StringTokenizer st = new StringTokenizer(s, " \t,");
242      if (st.countTokens() != 2) {
243        throw new ParseException("Bad integer specification: " + s);
244      }
245      int from = Integer.parseInt(st.nextToken());
246      int to = Integer.parseInt(st.nextToken());
247      return new IntegerRange(from, to);
248    }
249
250    public int from() {
251      return from;
252    }
253
254    public int to() {
255      return to;
256    }
257  }
258
259  private static class MyOptions {
260    // hard coded constants
261    int dictSize = 1000;
262    int minWordLen = 5;
263    int maxWordLen = 20;
264
265    private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
266    String rootDir =
267      TEST_UTIL.getDataTestDir("TestTFileSeek").toString();
268    String file = "TestTFileSeek";
269    // String compress = "lzo"; DISABLED
270    String compress = "none";
271    int minKeyLen = 10;
272    int maxKeyLen = 50;
273    int minValLength = 1024;
274    int maxValLength = 2 * 1024;
275    int minBlockSize = 1 * 1024 * 1024;
276    int fsOutputBufferSize = 1;
277    int fsInputBufferSize = 0;
278    // Default writing 10MB.
279    long fileSize = 10 * 1024 * 1024;
280    long seekCount = 1000;
281    long trialCount = 1;
282    long seed;
283    boolean useRawFs = false;
284
285    static final int OP_CREATE = 1;
286    static final int OP_READ = 2;
287    int op = OP_CREATE | OP_READ;
288
289    boolean proceed = false;
290
291    public MyOptions(String[] args) {
292      seed = System.nanoTime();
293
294      try {
295        Options opts = buildOptions();
296        CommandLineParser parser = new GnuParser();
297        CommandLine line = parser.parse(opts, args, true);
298        processOptions(line, opts);
299        validateOptions();
300      }
301      catch (ParseException e) {
302        System.out.println(e.getMessage());
303        System.out.println("Try \"--help\" option for details.");
304        setStopProceed();
305      }
306    }
307
308    public boolean proceed() {
309      return proceed;
310    }
311
312    private Options buildOptions() {
313      Option compress =
314          OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz|snappy]")
315              .hasArg().withDescription("compression scheme").create('c');
316
317      Option fileSize =
318          OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB")
319              .hasArg().withDescription("target size of the file (in MB).")
320              .create('s');
321
322      Option fsInputBufferSz =
323          OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
324              .hasArg().withDescription(
325                  "size of the file system input buffer (in bytes).").create(
326                  'i');
327
328      Option fsOutputBufferSize =
329          OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
330              .hasArg().withDescription(
331                  "size of the file system output buffer (in bytes).").create(
332                  'o');
333
334      Option keyLen =
335          OptionBuilder
336              .withLongOpt("key-length")
337              .withArgName("min,max")
338              .hasArg()
339              .withDescription(
340                  "the length range of the key (in bytes)")
341              .create('k');
342
343      Option valueLen =
344          OptionBuilder
345              .withLongOpt("value-length")
346              .withArgName("min,max")
347              .hasArg()
348              .withDescription(
349                  "the length range of the value (in bytes)")
350              .create('v');
351
352      Option blockSz =
353          OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
354              .withDescription("minimum block size (in KB)").create('b');
355
356      Option operation =
357          OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
358              .withDescription(
359                  "action: seek-only, create-only, seek-after-create").create(
360                  'x');
361
362      Option rootDir =
363          OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
364              .withDescription(
365                  "specify root directory where files will be created.")
366              .create('r');
367
368      Option file =
369          OptionBuilder.withLongOpt("file").withArgName("name").hasArg()
370              .withDescription("specify the file name to be created or read.")
371              .create('f');
372
373      Option seekCount =
374          OptionBuilder
375              .withLongOpt("seek")
376              .withArgName("count")
377              .hasArg()
378              .withDescription(
379                  "specify how many seek operations we perform (requires -x r or -x rw.")
380              .create('n');
381
382      Option trialCount =
383          OptionBuilder
384              .withLongOpt("trials")
385              .withArgName("n")
386              .hasArg()
387              .withDescription(
388                  "specify how many times to run the whole benchmark")
389              .create('t');
390
391      Option useRawFs =
392          OptionBuilder
393            .withLongOpt("rawfs")
394            .withDescription("use raw instead of checksummed file system")
395            .create();
396
397      Option help =
398          OptionBuilder.withLongOpt("help").hasArg(false).withDescription(
399              "show this screen").create("h");
400
401      return new Options().addOption(compress).addOption(fileSize).addOption(
402          fsInputBufferSz).addOption(fsOutputBufferSize).addOption(keyLen)
403          .addOption(blockSz).addOption(rootDir).addOption(valueLen)
404          .addOption(operation).addOption(seekCount).addOption(file)
405          .addOption(trialCount).addOption(useRawFs).addOption(help);
406
407    }
408
409    private void processOptions(CommandLine line, Options opts)
410        throws ParseException {
411      // --help -h and --version -V must be processed first.
412      if (line.hasOption('h')) {
413        HelpFormatter formatter = new HelpFormatter();
414        System.out.println("TFile and SeqFile benchmark.");
415        System.out.println();
416        formatter.printHelp(100,
417            "java ... TestTFileSeqFileComparison [options]",
418            "\nSupported options:", opts, "");
419        return;
420      }
421
422      if (line.hasOption('c')) {
423        compress = line.getOptionValue('c');
424      }
425
426      if (line.hasOption('d')) {
427        dictSize = Integer.parseInt(line.getOptionValue('d'));
428      }
429
430      if (line.hasOption('s')) {
431        fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
432      }
433
434      if (line.hasOption('i')) {
435        fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
436      }
437
438      if (line.hasOption('o')) {
439        fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
440      }
441
442      if (line.hasOption('n')) {
443        seekCount = Integer.parseInt(line.getOptionValue('n'));
444      }
445
446      if (line.hasOption('t')) {
447        trialCount = Integer.parseInt(line.getOptionValue('t'));
448      }
449
450      if (line.hasOption('k')) {
451        IntegerRange ir = IntegerRange.parse(line.getOptionValue('k'));
452        minKeyLen = ir.from();
453        maxKeyLen = ir.to();
454      }
455
456      if (line.hasOption('v')) {
457        IntegerRange ir = IntegerRange.parse(line.getOptionValue('v'));
458        minValLength = ir.from();
459        maxValLength = ir.to();
460      }
461
462      if (line.hasOption('b')) {
463        minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
464      }
465
466      if (line.hasOption('r')) {
467        rootDir = line.getOptionValue('r');
468      }
469
470      if (line.hasOption('f')) {
471        file = line.getOptionValue('f');
472      }
473
474      if (line.hasOption('S')) {
475        seed = Long.parseLong(line.getOptionValue('S'));
476      }
477
478      if (line.hasOption('x')) {
479        String strOp = line.getOptionValue('x');
480        if (strOp.equals("r")) {
481          op = OP_READ;
482        }
483        else if (strOp.equals("w")) {
484          op = OP_CREATE;
485        }
486        else if (strOp.equals("rw")) {
487          op = OP_CREATE | OP_READ;
488        }
489        else {
490          throw new ParseException("Unknown action specifier: " + strOp);
491        }
492      }
493
494      useRawFs = line.hasOption("rawfs");
495
496      proceed = true;
497    }
498
499    private void validateOptions() throws ParseException {
500      if (!compress.equals("none") && !compress.equals("lzo")
501          && !compress.equals("gz") && !compress.equals("snappy")) {
502        throw new ParseException("Unknown compression scheme: " + compress);
503      }
504
505      if (minKeyLen >= maxKeyLen) {
506        throw new ParseException(
507            "Max key length must be greater than min key length.");
508      }
509
510      if (minValLength >= maxValLength) {
511        throw new ParseException(
512            "Max value length must be greater than min value length.");
513      }
514
515      if (minWordLen >= maxWordLen) {
516        throw new ParseException(
517            "Max word length must be greater than min word length.");
518      }
519      return;
520    }
521
522    private void setStopProceed() {
523      proceed = false;
524    }
525
526    public boolean doCreate() {
527      return (op & OP_CREATE) != 0;
528    }
529
530    public boolean doRead() {
531      return (op & OP_READ) != 0;
532    }
533  }
534
535  public static void main(String[] argv) throws IOException {
536    TestHFileSeek testCase = new TestHFileSeek();
537    MyOptions options = new MyOptions(argv);
538
539    if (options.proceed == false) {
540      return;
541    }
542
543    testCase.options = options;
544    for (int i = 0; i < options.trialCount; i++) {
545      LOG.info("Beginning trial " + (i+1));
546      testCase.setUp();
547      testCase.testSeeks();
548      testCase.tearDown();
549    }
550  }
551
552}
553