001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.Random;
023import java.util.StringTokenizer;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FSDataOutputStream;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.fs.RawLocalFileSystem;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
033import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
034import org.apache.hadoop.hbase.testclassification.IOTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.apache.hadoop.hbase.util.RandomDistribution;
037import org.apache.hadoop.io.BytesWritable;
038import org.junit.After;
039import org.junit.Before;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
047import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
048import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
049import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
050import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
051import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionBuilder;
052import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
053import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
054
055/**
056 * test the performance for seek.
057 * <p>
058 * Copied from <a href="https://issues.apache.org/jira/browse/HADOOP-3315">hadoop-3315 tfile</a>.
059 * Remove after tfile is committed and use the tfile version of this class instead.
060 * </p>
061 */
062@Category({ IOTests.class, SmallTests.class })
063public class TestHFileSeek {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestHFileSeek.class);
068
069  private static final byte[] CF = "f1".getBytes();
070  private static final byte[] QUAL = "q1".getBytes();
071  private static final boolean USE_PREAD = true;
072  private MyOptions options;
073  private Configuration conf;
074  private Path path;
075  private FileSystem fs;
076  private NanoTimer timer;
077  private Random rng;
078  private RandomDistribution.DiscreteRNG keyLenGen;
079  private KVGenerator kvGen;
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestHFileSeek.class);
082
083  @Before
084  public void setUp() throws IOException {
085    if (options == null) {
086      options = new MyOptions(new String[0]);
087    }
088
089    conf = new Configuration();
090
091    if (options.useRawFs) {
092      conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class);
093    }
094
095    conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
096    conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
097    path = new Path(new Path(options.rootDir), options.file);
098    fs = path.getFileSystem(conf);
099    timer = new NanoTimer(false);
100    rng = new Random(options.seed);
101    keyLenGen = new RandomDistribution.Zipf(new Random(rng.nextLong()), options.minKeyLen,
102      options.maxKeyLen, 1.2);
103    RandomDistribution.DiscreteRNG valLenGen = new RandomDistribution.Flat(
104      new Random(rng.nextLong()), options.minValLength, options.maxValLength);
105    RandomDistribution.DiscreteRNG wordLenGen = new RandomDistribution.Flat(
106      new Random(rng.nextLong()), options.minWordLen, options.maxWordLen);
107    kvGen = new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen, options.dictSize);
108  }
109
110  @After
111  public void tearDown() {
112    try {
113      fs.close();
114    } catch (Exception e) {
115      // Nothing
116    }
117  }
118
119  private static FSDataOutputStream createFSOutput(Path name, FileSystem fs) throws IOException {
120    if (fs.exists(name)) {
121      fs.delete(name, true);
122    }
123    FSDataOutputStream fout = fs.create(name);
124    return fout;
125  }
126
127  private void createTFile() throws IOException {
128    long totalBytes = 0;
129    FSDataOutputStream fout = createFSOutput(path, fs);
130    try {
131      HFileContext context = new HFileContextBuilder().withBlockSize(options.minBlockSize)
132        .withCompression(HFileWriterImpl.compressionByName(options.compress)).build();
133      Writer writer = HFile.getWriterFactoryNoCache(conf).withOutputStream(fout)
134        .withFileContext(context).create();
135      try {
136        BytesWritable key = new BytesWritable();
137        BytesWritable val = new BytesWritable();
138        timer.start();
139        for (long i = 0; true; ++i) {
140          if (i % 1000 == 0) { // test the size for every 1000 rows.
141            if (fs.getFileStatus(path).getLen() >= options.fileSize) {
142              break;
143            }
144          }
145          kvGen.next(key, val, false);
146          byte[] k = new byte[key.getLength()];
147          System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
148          byte[] v = new byte[val.getLength()];
149          System.arraycopy(val.getBytes(), 0, v, 0, key.getLength());
150          KeyValue kv = new KeyValue(k, CF, QUAL, v);
151          writer.append(kv);
152          totalBytes += kv.getKeyLength();
153          totalBytes += kv.getValueLength();
154        }
155        timer.stop();
156      } finally {
157        writer.close();
158      }
159    } finally {
160      fout.close();
161    }
162    double duration = (double) timer.read() / 1000; // in us.
163    long fsize = fs.getFileStatus(path).getLen();
164
165    System.out.printf("time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n", timer.toString(),
166      (double) totalBytes / 1024 / 1024, totalBytes / duration);
167    System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n", timer.toString(),
168      (double) fsize / 1024 / 1024, fsize / duration);
169  }
170
171  public void seekTFile() throws IOException {
172    int miss = 0;
173    long totalBytes = 0;
174    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, path).build();
175    Reader reader = TestHFile.createReaderFromStream(context, new CacheConfig(conf), conf);
176    KeySampler kSampler = new KeySampler(rng, ((KeyValue) reader.getFirstKey().get()).getKey(),
177      ((KeyValue) reader.getLastKey().get()).getKey(), keyLenGen);
178    HFileScanner scanner = reader.getScanner(conf, false, USE_PREAD);
179    BytesWritable key = new BytesWritable();
180    timer.reset();
181    timer.start();
182    for (int i = 0; i < options.seekCount; ++i) {
183      kSampler.next(key);
184      byte[] k = new byte[key.getLength()];
185      System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
186      KeyValue kv = new KeyValue(k, CF, QUAL);
187      if (scanner.seekTo(kv) >= 0) {
188        ByteBuffer bbkey = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey());
189        ByteBuffer bbval = scanner.getValue();
190        totalBytes += bbkey.limit();
191        totalBytes += bbval.limit();
192      } else {
193        ++miss;
194      }
195    }
196    timer.stop();
197    System.out.printf("time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
198      timer.toString(), NanoTimer.nanoTimeToString(timer.read() / options.seekCount),
199      options.seekCount - miss, miss, (double) totalBytes / 1024 / (options.seekCount - miss));
200
201  }
202
203  @Test
204  public void testSeeks() throws IOException {
205    if (options.doCreate()) {
206      createTFile();
207    }
208
209    if (options.doRead()) {
210      seekTFile();
211    }
212
213    if (options.doCreate()) {
214      fs.delete(path, true);
215    }
216  }
217
218  private static class IntegerRange {
219    private final int from, to;
220
221    public IntegerRange(int from, int to) {
222      this.from = from;
223      this.to = to;
224    }
225
226    public static IntegerRange parse(String s) throws ParseException {
227      StringTokenizer st = new StringTokenizer(s, " \t,");
228      if (st.countTokens() != 2) {
229        throw new ParseException("Bad integer specification: " + s);
230      }
231      int from = Integer.parseInt(st.nextToken());
232      int to = Integer.parseInt(st.nextToken());
233      return new IntegerRange(from, to);
234    }
235
236    public int from() {
237      return from;
238    }
239
240    public int to() {
241      return to;
242    }
243  }
244
245  private static class MyOptions {
246    // hard coded constants
247    int dictSize = 1000;
248    int minWordLen = 5;
249    int maxWordLen = 20;
250
251    private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
252    String rootDir = TEST_UTIL.getDataTestDir("TestTFileSeek").toString();
253    String file = "TestTFileSeek";
254    // String compress = "lzo"; DISABLED
255    String compress = "none";
256    int minKeyLen = 10;
257    int maxKeyLen = 50;
258    int minValLength = 1024;
259    int maxValLength = 2 * 1024;
260    int minBlockSize = 1 * 1024 * 1024;
261    int fsOutputBufferSize = 1;
262    int fsInputBufferSize = 0;
263    // Default writing 10MB.
264    long fileSize = 10 * 1024 * 1024;
265    long seekCount = 1000;
266    long trialCount = 1;
267    long seed;
268    boolean useRawFs = false;
269
270    static final int OP_CREATE = 1;
271    static final int OP_READ = 2;
272    int op = OP_CREATE | OP_READ;
273
274    boolean proceed = false;
275
276    public MyOptions(String[] args) {
277      seed = System.nanoTime();
278
279      try {
280        Options opts = buildOptions();
281        CommandLineParser parser = new GnuParser();
282        CommandLine line = parser.parse(opts, args, true);
283        processOptions(line, opts);
284        validateOptions();
285      } catch (ParseException e) {
286        System.out.println(e.getMessage());
287        System.out.println("Try \"--help\" option for details.");
288        setStopProceed();
289      }
290    }
291
292    public boolean proceed() {
293      return proceed;
294    }
295
296    private Options buildOptions() {
297      Option compress = OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz|snappy]")
298        .hasArg().withDescription("compression scheme").create('c');
299
300      Option fileSize = OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB").hasArg()
301        .withDescription("target size of the file (in MB).").create('s');
302
303      Option fsInputBufferSz = OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
304        .hasArg().withDescription("size of the file system input buffer (in bytes).").create('i');
305
306      Option fsOutputBufferSize = OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
307        .hasArg().withDescription("size of the file system output buffer (in bytes).").create('o');
308
309      Option keyLen = OptionBuilder.withLongOpt("key-length").withArgName("min,max").hasArg()
310        .withDescription("the length range of the key (in bytes)").create('k');
311
312      Option valueLen = OptionBuilder.withLongOpt("value-length").withArgName("min,max").hasArg()
313        .withDescription("the length range of the value (in bytes)").create('v');
314
315      Option blockSz = OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
316        .withDescription("minimum block size (in KB)").create('b');
317
318      Option operation = OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
319        .withDescription("action: seek-only, create-only, seek-after-create").create('x');
320
321      Option rootDir = OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
322        .withDescription("specify root directory where files will be created.").create('r');
323
324      Option file = OptionBuilder.withLongOpt("file").withArgName("name").hasArg()
325        .withDescription("specify the file name to be created or read.").create('f');
326
327      Option seekCount = OptionBuilder.withLongOpt("seek").withArgName("count").hasArg()
328        .withDescription("specify how many seek operations we perform (requires -x r or -x rw.")
329        .create('n');
330
331      Option trialCount = OptionBuilder.withLongOpt("trials").withArgName("n").hasArg()
332        .withDescription("specify how many times to run the whole benchmark").create('t');
333
334      Option useRawFs = OptionBuilder.withLongOpt("rawfs")
335        .withDescription("use raw instead of checksummed file system").create();
336
337      Option help = OptionBuilder.withLongOpt("help").hasArg(false)
338        .withDescription("show this screen").create("h");
339
340      return new Options().addOption(compress).addOption(fileSize).addOption(fsInputBufferSz)
341        .addOption(fsOutputBufferSize).addOption(keyLen).addOption(blockSz).addOption(rootDir)
342        .addOption(valueLen).addOption(operation).addOption(seekCount).addOption(file)
343        .addOption(trialCount).addOption(useRawFs).addOption(help);
344
345    }
346
347    private void processOptions(CommandLine line, Options opts) throws ParseException {
348      // --help -h and --version -V must be processed first.
349      if (line.hasOption('h')) {
350        HelpFormatter formatter = new HelpFormatter();
351        System.out.println("TFile and SeqFile benchmark.");
352        System.out.println();
353        formatter.printHelp(100, "java ... TestTFileSeqFileComparison [options]",
354          "\nSupported options:", opts, "");
355        return;
356      }
357
358      if (line.hasOption('c')) {
359        compress = line.getOptionValue('c');
360      }
361
362      if (line.hasOption('d')) {
363        dictSize = Integer.parseInt(line.getOptionValue('d'));
364      }
365
366      if (line.hasOption('s')) {
367        fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
368      }
369
370      if (line.hasOption('i')) {
371        fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
372      }
373
374      if (line.hasOption('o')) {
375        fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
376      }
377
378      if (line.hasOption('n')) {
379        seekCount = Integer.parseInt(line.getOptionValue('n'));
380      }
381
382      if (line.hasOption('t')) {
383        trialCount = Integer.parseInt(line.getOptionValue('t'));
384      }
385
386      if (line.hasOption('k')) {
387        IntegerRange ir = IntegerRange.parse(line.getOptionValue('k'));
388        minKeyLen = ir.from();
389        maxKeyLen = ir.to();
390      }
391
392      if (line.hasOption('v')) {
393        IntegerRange ir = IntegerRange.parse(line.getOptionValue('v'));
394        minValLength = ir.from();
395        maxValLength = ir.to();
396      }
397
398      if (line.hasOption('b')) {
399        minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
400      }
401
402      if (line.hasOption('r')) {
403        rootDir = line.getOptionValue('r');
404      }
405
406      if (line.hasOption('f')) {
407        file = line.getOptionValue('f');
408      }
409
410      if (line.hasOption('S')) {
411        seed = Long.parseLong(line.getOptionValue('S'));
412      }
413
414      if (line.hasOption('x')) {
415        String strOp = line.getOptionValue('x');
416        if (strOp.equals("r")) {
417          op = OP_READ;
418        } else if (strOp.equals("w")) {
419          op = OP_CREATE;
420        } else if (strOp.equals("rw")) {
421          op = OP_CREATE | OP_READ;
422        } else {
423          throw new ParseException("Unknown action specifier: " + strOp);
424        }
425      }
426
427      useRawFs = line.hasOption("rawfs");
428
429      proceed = true;
430    }
431
432    private void validateOptions() throws ParseException {
433      if (
434        !compress.equals("none") && !compress.equals("lzo") && !compress.equals("gz")
435          && !compress.equals("snappy")
436      ) {
437        throw new ParseException("Unknown compression scheme: " + compress);
438      }
439
440      if (minKeyLen >= maxKeyLen) {
441        throw new ParseException("Max key length must be greater than min key length.");
442      }
443
444      if (minValLength >= maxValLength) {
445        throw new ParseException("Max value length must be greater than min value length.");
446      }
447
448      if (minWordLen >= maxWordLen) {
449        throw new ParseException("Max word length must be greater than min word length.");
450      }
451      return;
452    }
453
454    private void setStopProceed() {
455      proceed = false;
456    }
457
458    public boolean doCreate() {
459      return (op & OP_CREATE) != 0;
460    }
461
462    public boolean doRead() {
463      return (op & OP_READ) != 0;
464    }
465  }
466
467  public static void main(String[] argv) throws IOException {
468    TestHFileSeek testCase = new TestHFileSeek();
469    MyOptions options = new MyOptions(argv);
470
471    if (options.proceed == false) {
472      return;
473    }
474
475    testCase.options = options;
476    for (int i = 0; i < options.trialCount; i++) {
477      LOG.info("Beginning trial " + (i + 1));
478      testCase.setUp();
479      testCase.testSeeks();
480      testCase.tearDown();
481    }
482  }
483
484}