001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.Random;
023import java.util.StringTokenizer;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FSDataOutputStream;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.fs.RawLocalFileSystem;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
033import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
034import org.apache.hadoop.hbase.testclassification.IOTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.RandomDistribution;
038import org.apache.hadoop.io.BytesWritable;
039import org.junit.After;
040import org.junit.Before;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
048import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
049import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
050import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
051import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
052import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionBuilder;
053import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
054import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
055
056/**
057 * test the performance for seek.
058 * <p>
059 * Copied from <a href="https://issues.apache.org/jira/browse/HADOOP-3315">hadoop-3315 tfile</a>.
060 * Remove after tfile is committed and use the tfile version of this class instead.
061 * </p>
062 */
063@Category({ IOTests.class, SmallTests.class })
064public class TestHFileSeek {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestHFileSeek.class);
069
070  private static final byte[] CF = Bytes.toBytes("f1");
071  private static final byte[] QUAL = Bytes.toBytes("q1");
072  private static final boolean USE_PREAD = true;
073  private MyOptions options;
074  private Configuration conf;
075  private Path path;
076  private FileSystem fs;
077  private NanoTimer timer;
078  private Random rng;
079  private RandomDistribution.DiscreteRNG keyLenGen;
080  private KVGenerator kvGen;
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestHFileSeek.class);
083
084  @Before
085  public void setUp() throws IOException {
086    if (options == null) {
087      options = new MyOptions(new String[0]);
088    }
089
090    conf = new Configuration();
091
092    if (options.useRawFs) {
093      conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class);
094    }
095
096    conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
097    conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
098    path = new Path(new Path(options.rootDir), options.file);
099    fs = path.getFileSystem(conf);
100    timer = new NanoTimer(false);
101    rng = new Random(options.seed);
102    keyLenGen = new RandomDistribution.Zipf(new Random(rng.nextLong()), options.minKeyLen,
103      options.maxKeyLen, 1.2);
104    RandomDistribution.DiscreteRNG valLenGen = new RandomDistribution.Flat(
105      new Random(rng.nextLong()), options.minValLength, options.maxValLength);
106    RandomDistribution.DiscreteRNG wordLenGen = new RandomDistribution.Flat(
107      new Random(rng.nextLong()), options.minWordLen, options.maxWordLen);
108    kvGen = new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen, options.dictSize);
109  }
110
111  @After
112  public void tearDown() {
113    try {
114      fs.close();
115    } catch (Exception e) {
116      // Nothing
117    }
118  }
119
120  private static FSDataOutputStream createFSOutput(Path name, FileSystem fs) throws IOException {
121    if (fs.exists(name)) {
122      fs.delete(name, true);
123    }
124    FSDataOutputStream fout = fs.create(name);
125    return fout;
126  }
127
128  private void createTFile() throws IOException {
129    long totalBytes = 0;
130    FSDataOutputStream fout = createFSOutput(path, fs);
131    try {
132      HFileContext context = new HFileContextBuilder().withBlockSize(options.minBlockSize)
133        .withCompression(HFileWriterImpl.compressionByName(options.compress)).build();
134      Writer writer = HFile.getWriterFactoryNoCache(conf).withOutputStream(fout)
135        .withFileContext(context).create();
136      try {
137        BytesWritable key = new BytesWritable();
138        BytesWritable val = new BytesWritable();
139        timer.start();
140        for (long i = 0; true; ++i) {
141          if (i % 1000 == 0) { // test the size for every 1000 rows.
142            if (fs.getFileStatus(path).getLen() >= options.fileSize) {
143              break;
144            }
145          }
146          kvGen.next(key, val, false);
147          byte[] k = new byte[key.getLength()];
148          System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
149          byte[] v = new byte[val.getLength()];
150          System.arraycopy(val.getBytes(), 0, v, 0, key.getLength());
151          KeyValue kv = new KeyValue(k, CF, QUAL, v);
152          writer.append(kv);
153          totalBytes += kv.getKeyLength();
154          totalBytes += kv.getValueLength();
155        }
156        timer.stop();
157      } finally {
158        writer.close();
159      }
160    } finally {
161      fout.close();
162    }
163    double duration = (double) timer.read() / 1000; // in us.
164    long fsize = fs.getFileStatus(path).getLen();
165
166    System.out.printf("time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n", timer.toString(),
167      (double) totalBytes / 1024 / 1024, totalBytes / duration);
168    System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n", timer.toString(),
169      (double) fsize / 1024 / 1024, fsize / duration);
170  }
171
172  public void seekTFile() throws IOException {
173    int miss = 0;
174    long totalBytes = 0;
175    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, path).build();
176    Reader reader = TestHFile.createReaderFromStream(context, new CacheConfig(conf), conf);
177    KeySampler kSampler = new KeySampler(rng, ((KeyValue) reader.getFirstKey().get()).getKey(),
178      ((KeyValue) reader.getLastKey().get()).getKey(), keyLenGen);
179    HFileScanner scanner = reader.getScanner(conf, false, USE_PREAD);
180    BytesWritable key = new BytesWritable();
181    timer.reset();
182    timer.start();
183    for (int i = 0; i < options.seekCount; ++i) {
184      kSampler.next(key);
185      byte[] k = new byte[key.getLength()];
186      System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
187      KeyValue kv = new KeyValue(k, CF, QUAL);
188      if (scanner.seekTo(kv) >= 0) {
189        ByteBuffer bbkey = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey());
190        ByteBuffer bbval = scanner.getValue();
191        totalBytes += bbkey.limit();
192        totalBytes += bbval.limit();
193      } else {
194        ++miss;
195      }
196    }
197    timer.stop();
198    System.out.printf("time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
199      timer.toString(), NanoTimer.nanoTimeToString(timer.read() / options.seekCount),
200      options.seekCount - miss, miss, (double) totalBytes / 1024 / (options.seekCount - miss));
201
202  }
203
204  @Test
205  public void testSeeks() throws IOException {
206    if (options.doCreate()) {
207      createTFile();
208    }
209
210    if (options.doRead()) {
211      seekTFile();
212    }
213
214    if (options.doCreate()) {
215      fs.delete(path, true);
216    }
217  }
218
219  private static class IntegerRange {
220    private final int from, to;
221
222    public IntegerRange(int from, int to) {
223      this.from = from;
224      this.to = to;
225    }
226
227    public static IntegerRange parse(String s) throws ParseException {
228      StringTokenizer st = new StringTokenizer(s, " \t,");
229      if (st.countTokens() != 2) {
230        throw new ParseException("Bad integer specification: " + s);
231      }
232      int from = Integer.parseInt(st.nextToken());
233      int to = Integer.parseInt(st.nextToken());
234      return new IntegerRange(from, to);
235    }
236
237    public int from() {
238      return from;
239    }
240
241    public int to() {
242      return to;
243    }
244  }
245
246  private static class MyOptions {
247    // hard coded constants
248    int dictSize = 1000;
249    int minWordLen = 5;
250    int maxWordLen = 20;
251
252    private HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
253    String rootDir = TEST_UTIL.getDataTestDir("TestTFileSeek").toString();
254    String file = "TestTFileSeek";
255    // String compress = "lzo"; DISABLED
256    String compress = "none";
257    int minKeyLen = 10;
258    int maxKeyLen = 50;
259    int minValLength = 1024;
260    int maxValLength = 2 * 1024;
261    int minBlockSize = 1 * 1024 * 1024;
262    int fsOutputBufferSize = 1;
263    int fsInputBufferSize = 0;
264    // Default writing 10MB.
265    long fileSize = 10 * 1024 * 1024;
266    long seekCount = 1000;
267    long trialCount = 1;
268    long seed;
269    boolean useRawFs = false;
270
271    static final int OP_CREATE = 1;
272    static final int OP_READ = 2;
273    int op = OP_CREATE | OP_READ;
274
275    boolean proceed = false;
276
277    public MyOptions(String[] args) {
278      seed = System.nanoTime();
279
280      try {
281        Options opts = buildOptions();
282        CommandLineParser parser = new GnuParser();
283        CommandLine line = parser.parse(opts, args, true);
284        processOptions(line, opts);
285        validateOptions();
286      } catch (ParseException e) {
287        System.out.println(e.getMessage());
288        System.out.println("Try \"--help\" option for details.");
289        setStopProceed();
290      }
291    }
292
293    public boolean proceed() {
294      return proceed;
295    }
296
297    private Options buildOptions() {
298      Option compress = OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz|snappy]")
299        .hasArg().withDescription("compression scheme").create('c');
300
301      Option fileSize = OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB").hasArg()
302        .withDescription("target size of the file (in MB).").create('s');
303
304      Option fsInputBufferSz = OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
305        .hasArg().withDescription("size of the file system input buffer (in bytes).").create('i');
306
307      Option fsOutputBufferSize = OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
308        .hasArg().withDescription("size of the file system output buffer (in bytes).").create('o');
309
310      Option keyLen = OptionBuilder.withLongOpt("key-length").withArgName("min,max").hasArg()
311        .withDescription("the length range of the key (in bytes)").create('k');
312
313      Option valueLen = OptionBuilder.withLongOpt("value-length").withArgName("min,max").hasArg()
314        .withDescription("the length range of the value (in bytes)").create('v');
315
316      Option blockSz = OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
317        .withDescription("minimum block size (in KB)").create('b');
318
319      Option operation = OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
320        .withDescription("action: seek-only, create-only, seek-after-create").create('x');
321
322      Option rootDir = OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
323        .withDescription("specify root directory where files will be created.").create('r');
324
325      Option file = OptionBuilder.withLongOpt("file").withArgName("name").hasArg()
326        .withDescription("specify the file name to be created or read.").create('f');
327
328      Option seekCount = OptionBuilder.withLongOpt("seek").withArgName("count").hasArg()
329        .withDescription("specify how many seek operations we perform (requires -x r or -x rw.")
330        .create('n');
331
332      Option trialCount = OptionBuilder.withLongOpt("trials").withArgName("n").hasArg()
333        .withDescription("specify how many times to run the whole benchmark").create('t');
334
335      Option useRawFs = OptionBuilder.withLongOpt("rawfs")
336        .withDescription("use raw instead of checksummed file system").create();
337
338      Option help = OptionBuilder.withLongOpt("help").hasArg(false)
339        .withDescription("show this screen").create("h");
340
341      return new Options().addOption(compress).addOption(fileSize).addOption(fsInputBufferSz)
342        .addOption(fsOutputBufferSize).addOption(keyLen).addOption(blockSz).addOption(rootDir)
343        .addOption(valueLen).addOption(operation).addOption(seekCount).addOption(file)
344        .addOption(trialCount).addOption(useRawFs).addOption(help);
345
346    }
347
348    private void processOptions(CommandLine line, Options opts) throws ParseException {
349      // --help -h and --version -V must be processed first.
350      if (line.hasOption('h')) {
351        HelpFormatter formatter = new HelpFormatter();
352        System.out.println("TFile and SeqFile benchmark.");
353        System.out.println();
354        formatter.printHelp(100, "java ... TestTFileSeqFileComparison [options]",
355          "\nSupported options:", opts, "");
356        return;
357      }
358
359      if (line.hasOption('c')) {
360        compress = line.getOptionValue('c');
361      }
362
363      if (line.hasOption('d')) {
364        dictSize = Integer.parseInt(line.getOptionValue('d'));
365      }
366
367      if (line.hasOption('s')) {
368        fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
369      }
370
371      if (line.hasOption('i')) {
372        fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
373      }
374
375      if (line.hasOption('o')) {
376        fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
377      }
378
379      if (line.hasOption('n')) {
380        seekCount = Integer.parseInt(line.getOptionValue('n'));
381      }
382
383      if (line.hasOption('t')) {
384        trialCount = Integer.parseInt(line.getOptionValue('t'));
385      }
386
387      if (line.hasOption('k')) {
388        IntegerRange ir = IntegerRange.parse(line.getOptionValue('k'));
389        minKeyLen = ir.from();
390        maxKeyLen = ir.to();
391      }
392
393      if (line.hasOption('v')) {
394        IntegerRange ir = IntegerRange.parse(line.getOptionValue('v'));
395        minValLength = ir.from();
396        maxValLength = ir.to();
397      }
398
399      if (line.hasOption('b')) {
400        minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
401      }
402
403      if (line.hasOption('r')) {
404        rootDir = line.getOptionValue('r');
405      }
406
407      if (line.hasOption('f')) {
408        file = line.getOptionValue('f');
409      }
410
411      if (line.hasOption('S')) {
412        seed = Long.parseLong(line.getOptionValue('S'));
413      }
414
415      if (line.hasOption('x')) {
416        String strOp = line.getOptionValue('x');
417        if (strOp.equals("r")) {
418          op = OP_READ;
419        } else if (strOp.equals("w")) {
420          op = OP_CREATE;
421        } else if (strOp.equals("rw")) {
422          op = OP_CREATE | OP_READ;
423        } else {
424          throw new ParseException("Unknown action specifier: " + strOp);
425        }
426      }
427
428      useRawFs = line.hasOption("rawfs");
429
430      proceed = true;
431    }
432
433    private void validateOptions() throws ParseException {
434      if (
435        !compress.equals("none") && !compress.equals("lzo") && !compress.equals("gz")
436          && !compress.equals("snappy")
437      ) {
438        throw new ParseException("Unknown compression scheme: " + compress);
439      }
440
441      if (minKeyLen >= maxKeyLen) {
442        throw new ParseException("Max key length must be greater than min key length.");
443      }
444
445      if (minValLength >= maxValLength) {
446        throw new ParseException("Max value length must be greater than min value length.");
447      }
448
449      if (minWordLen >= maxWordLen) {
450        throw new ParseException("Max word length must be greater than min word length.");
451      }
452      return;
453    }
454
455    private void setStopProceed() {
456      proceed = false;
457    }
458
459    public boolean doCreate() {
460      return (op & OP_CREATE) != 0;
461    }
462
463    public boolean doRead() {
464      return (op & OP_READ) != 0;
465    }
466  }
467
468  public static void main(String[] argv) throws IOException {
469    TestHFileSeek testCase = new TestHFileSeek();
470    MyOptions options = new MyOptions(argv);
471
472    if (options.proceed == false) {
473      return;
474    }
475
476    testCase.options = options;
477    for (int i = 0; i < options.trialCount; i++) {
478      LOG.info("Beginning trial " + (i + 1));
479      testCase.setUp();
480      testCase.testSeeks();
481      testCase.tearDown();
482    }
483  }
484
485}