001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.Random;
023import java.util.StringTokenizer;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FSDataOutputStream;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.fs.RawLocalFileSystem;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
032import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
033import org.apache.hadoop.hbase.testclassification.IOTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.RandomDistribution;
037import org.apache.hadoop.io.BytesWritable;
038import org.junit.jupiter.api.AfterEach;
039import org.junit.jupiter.api.BeforeEach;
040import org.junit.jupiter.api.Tag;
041import org.junit.jupiter.api.Test;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
046import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
047import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
048import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
049import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
050import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionBuilder;
051import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
052import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
053
054/**
055 * test the performance for seek.
056 * <p>
057 * Copied from <a href="https://issues.apache.org/jira/browse/HADOOP-3315">hadoop-3315 tfile</a>.
058 * Remove after tfile is committed and use the tfile version of this class instead.
059 * </p>
060 */
061@Tag(IOTests.TAG)
062@Tag(SmallTests.TAG)
063public class TestHFileSeek {
064
065  private static final byte[] CF = Bytes.toBytes("f1");
066  private static final byte[] QUAL = Bytes.toBytes("q1");
067  private static final boolean USE_PREAD = true;
068  private MyOptions options;
069  private Configuration conf;
070  private Path path;
071  private FileSystem fs;
072  private NanoTimer timer;
073  private Random rng;
074  private RandomDistribution.DiscreteRNG keyLenGen;
075  private KVGenerator kvGen;
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestHFileSeek.class);
078
079  @BeforeEach
080  public void setUp() throws IOException {
081    if (options == null) {
082      options = new MyOptions(new String[0]);
083    }
084
085    conf = new Configuration();
086
087    if (options.useRawFs) {
088      conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class);
089    }
090
091    conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
092    conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
093    path = new Path(new Path(options.rootDir), options.file);
094    fs = path.getFileSystem(conf);
095    timer = new NanoTimer(false);
096    rng = new Random(options.seed);
097    keyLenGen = new RandomDistribution.Zipf(new Random(rng.nextLong()), options.minKeyLen,
098      options.maxKeyLen, 1.2);
099    RandomDistribution.DiscreteRNG valLenGen = new RandomDistribution.Flat(
100      new Random(rng.nextLong()), options.minValLength, options.maxValLength);
101    RandomDistribution.DiscreteRNG wordLenGen = new RandomDistribution.Flat(
102      new Random(rng.nextLong()), options.minWordLen, options.maxWordLen);
103    kvGen = new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen, options.dictSize);
104  }
105
106  @AfterEach
107  public void tearDown() {
108    try {
109      fs.close();
110    } catch (Exception e) {
111      // Nothing
112    }
113  }
114
115  private static FSDataOutputStream createFSOutput(Path name, FileSystem fs) throws IOException {
116    if (fs.exists(name)) {
117      fs.delete(name, true);
118    }
119    FSDataOutputStream fout = fs.create(name);
120    return fout;
121  }
122
123  private void createTFile() throws IOException {
124    long totalBytes = 0;
125    FSDataOutputStream fout = createFSOutput(path, fs);
126    try {
127      HFileContext context = new HFileContextBuilder().withBlockSize(options.minBlockSize)
128        .withCompression(HFileWriterImpl.compressionByName(options.compress)).build();
129      Writer writer = HFile.getWriterFactoryNoCache(conf).withOutputStream(fout)
130        .withFileContext(context).create();
131      try {
132        BytesWritable key = new BytesWritable();
133        BytesWritable val = new BytesWritable();
134        timer.start();
135        for (long i = 0; true; ++i) {
136          if (i % 1000 == 0) { // test the size for every 1000 rows.
137            if (fs.getFileStatus(path).getLen() >= options.fileSize) {
138              break;
139            }
140          }
141          kvGen.next(key, val, false);
142          byte[] k = new byte[key.getLength()];
143          System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
144          byte[] v = new byte[val.getLength()];
145          System.arraycopy(val.getBytes(), 0, v, 0, key.getLength());
146          KeyValue kv = new KeyValue(k, CF, QUAL, v);
147          writer.append(kv);
148          totalBytes += kv.getKeyLength();
149          totalBytes += kv.getValueLength();
150        }
151        timer.stop();
152      } finally {
153        writer.close();
154      }
155    } finally {
156      fout.close();
157    }
158    double duration = (double) timer.read() / 1000; // in us.
159    long fsize = fs.getFileStatus(path).getLen();
160
161    LOG.info(String.format("time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n",
162      timer.toString(), (double) totalBytes / 1024 / 1024, totalBytes / duration));
163    LOG.info(String.format("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n",
164      timer.toString(), (double) fsize / 1024 / 1024, fsize / duration));
165  }
166
167  public void seekTFile() throws IOException {
168    int miss = 0;
169    long totalBytes = 0;
170    ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, path).build();
171    Reader reader = TestHFile.createReaderFromStream(context, new CacheConfig(conf), conf);
172    KeySampler kSampler = new KeySampler(rng, ((KeyValue) reader.getFirstKey().get()).getKey(),
173      ((KeyValue) reader.getLastKey().get()).getKey(), keyLenGen);
174    HFileScanner scanner = reader.getScanner(conf, false, USE_PREAD);
175    BytesWritable key = new BytesWritable();
176    timer.reset();
177    timer.start();
178    for (int i = 0; i < options.seekCount; ++i) {
179      kSampler.next(key);
180      byte[] k = new byte[key.getLength()];
181      System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
182      KeyValue kv = new KeyValue(k, CF, QUAL);
183      if (scanner.seekTo(kv) >= 0) {
184        ByteBuffer bbkey = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey());
185        ByteBuffer bbval = scanner.getValue();
186        totalBytes += bbkey.limit();
187        totalBytes += bbval.limit();
188      } else {
189        ++miss;
190      }
191    }
192    timer.stop();
193    LOG.info(String.format("time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
194      timer.toString(), NanoTimer.nanoTimeToString(timer.read() / options.seekCount),
195      options.seekCount - miss, miss, (double) totalBytes / 1024 / (options.seekCount - miss)));
196
197  }
198
199  @Test
200  public void testSeeks() throws IOException {
201    if (options.doCreate()) {
202      createTFile();
203    }
204
205    if (options.doRead()) {
206      seekTFile();
207    }
208
209    if (options.doCreate()) {
210      fs.delete(path, true);
211    }
212  }
213
214  private static class IntegerRange {
215    private final int from, to;
216
217    public IntegerRange(int from, int to) {
218      this.from = from;
219      this.to = to;
220    }
221
222    public static IntegerRange parse(String s) throws ParseException {
223      StringTokenizer st = new StringTokenizer(s, " \t,");
224      if (st.countTokens() != 2) {
225        throw new ParseException("Bad integer specification: " + s);
226      }
227      int from = Integer.parseInt(st.nextToken());
228      int to = Integer.parseInt(st.nextToken());
229      return new IntegerRange(from, to);
230    }
231
232    public int from() {
233      return from;
234    }
235
236    public int to() {
237      return to;
238    }
239  }
240
241  private static class MyOptions {
242    // hard coded constants
243    int dictSize = 1000;
244    int minWordLen = 5;
245    int maxWordLen = 20;
246
247    private HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
248    String rootDir = TEST_UTIL.getDataTestDir("TestTFileSeek").toString();
249    String file = "TestTFileSeek";
250    // String compress = "lzo"; DISABLED
251    String compress = "none";
252    int minKeyLen = 10;
253    int maxKeyLen = 50;
254    int minValLength = 1024;
255    int maxValLength = 2 * 1024;
256    int minBlockSize = 1 * 1024 * 1024;
257    int fsOutputBufferSize = 1;
258    int fsInputBufferSize = 0;
259    // Default writing 10MB.
260    long fileSize = 10 * 1024 * 1024;
261    long seekCount = 1000;
262    long trialCount = 1;
263    long seed;
264    boolean useRawFs = false;
265
266    static final int OP_CREATE = 1;
267    static final int OP_READ = 2;
268    int op = OP_CREATE | OP_READ;
269
270    boolean proceed = false;
271
272    public MyOptions(String[] args) {
273      seed = System.nanoTime();
274
275      try {
276        Options opts = buildOptions();
277        CommandLineParser parser = new GnuParser();
278        CommandLine line = parser.parse(opts, args, true);
279        processOptions(line, opts);
280        validateOptions();
281      } catch (ParseException e) {
282        System.out.println(e.getMessage());
283        System.out.println("Try \"--help\" option for details.");
284        setStopProceed();
285      }
286    }
287
288    public boolean proceed() {
289      return proceed;
290    }
291
292    private Options buildOptions() {
293      Option compress = OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz|snappy]")
294        .hasArg().withDescription("compression scheme").create('c');
295
296      Option fileSize = OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB").hasArg()
297        .withDescription("target size of the file (in MB).").create('s');
298
299      Option fsInputBufferSz = OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
300        .hasArg().withDescription("size of the file system input buffer (in bytes).").create('i');
301
302      Option fsOutputBufferSize = OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
303        .hasArg().withDescription("size of the file system output buffer (in bytes).").create('o');
304
305      Option keyLen = OptionBuilder.withLongOpt("key-length").withArgName("min,max").hasArg()
306        .withDescription("the length range of the key (in bytes)").create('k');
307
308      Option valueLen = OptionBuilder.withLongOpt("value-length").withArgName("min,max").hasArg()
309        .withDescription("the length range of the value (in bytes)").create('v');
310
311      Option blockSz = OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
312        .withDescription("minimum block size (in KB)").create('b');
313
314      Option operation = OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
315        .withDescription("action: seek-only, create-only, seek-after-create").create('x');
316
317      Option rootDir = OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
318        .withDescription("specify root directory where files will be created.").create('r');
319
320      Option file = OptionBuilder.withLongOpt("file").withArgName("name").hasArg()
321        .withDescription("specify the file name to be created or read.").create('f');
322
323      Option seekCount = OptionBuilder.withLongOpt("seek").withArgName("count").hasArg()
324        .withDescription("specify how many seek operations we perform (requires -x r or -x rw.")
325        .create('n');
326
327      Option trialCount = OptionBuilder.withLongOpt("trials").withArgName("n").hasArg()
328        .withDescription("specify how many times to run the whole benchmark").create('t');
329
330      Option useRawFs = OptionBuilder.withLongOpt("rawfs")
331        .withDescription("use raw instead of checksummed file system").create();
332
333      Option help = OptionBuilder.withLongOpt("help").hasArg(false)
334        .withDescription("show this screen").create("h");
335
336      return new Options().addOption(compress).addOption(fileSize).addOption(fsInputBufferSz)
337        .addOption(fsOutputBufferSize).addOption(keyLen).addOption(blockSz).addOption(rootDir)
338        .addOption(valueLen).addOption(operation).addOption(seekCount).addOption(file)
339        .addOption(trialCount).addOption(useRawFs).addOption(help);
340
341    }
342
343    private void processOptions(CommandLine line, Options opts) throws ParseException {
344      // --help -h and --version -V must be processed first.
345      if (line.hasOption('h')) {
346        HelpFormatter formatter = new HelpFormatter();
347        System.out.println("TFile and SeqFile benchmark.");
348        System.out.println();
349        formatter.printHelp(100, "java ... TestTFileSeqFileComparison [options]",
350          "\nSupported options:", opts, "");
351        return;
352      }
353
354      if (line.hasOption('c')) {
355        compress = line.getOptionValue('c');
356      }
357
358      if (line.hasOption('d')) {
359        dictSize = Integer.parseInt(line.getOptionValue('d'));
360      }
361
362      if (line.hasOption('s')) {
363        fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
364      }
365
366      if (line.hasOption('i')) {
367        fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
368      }
369
370      if (line.hasOption('o')) {
371        fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
372      }
373
374      if (line.hasOption('n')) {
375        seekCount = Integer.parseInt(line.getOptionValue('n'));
376      }
377
378      if (line.hasOption('t')) {
379        trialCount = Integer.parseInt(line.getOptionValue('t'));
380      }
381
382      if (line.hasOption('k')) {
383        IntegerRange ir = IntegerRange.parse(line.getOptionValue('k'));
384        minKeyLen = ir.from();
385        maxKeyLen = ir.to();
386      }
387
388      if (line.hasOption('v')) {
389        IntegerRange ir = IntegerRange.parse(line.getOptionValue('v'));
390        minValLength = ir.from();
391        maxValLength = ir.to();
392      }
393
394      if (line.hasOption('b')) {
395        minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
396      }
397
398      if (line.hasOption('r')) {
399        rootDir = line.getOptionValue('r');
400      }
401
402      if (line.hasOption('f')) {
403        file = line.getOptionValue('f');
404      }
405
406      if (line.hasOption('S')) {
407        seed = Long.parseLong(line.getOptionValue('S'));
408      }
409
410      if (line.hasOption('x')) {
411        String strOp = line.getOptionValue('x');
412        if (strOp.equals("r")) {
413          op = OP_READ;
414        } else if (strOp.equals("w")) {
415          op = OP_CREATE;
416        } else if (strOp.equals("rw")) {
417          op = OP_CREATE | OP_READ;
418        } else {
419          throw new ParseException("Unknown action specifier: " + strOp);
420        }
421      }
422
423      useRawFs = line.hasOption("rawfs");
424
425      proceed = true;
426    }
427
428    private void validateOptions() throws ParseException {
429      if (
430        !compress.equals("none") && !compress.equals("lzo") && !compress.equals("gz")
431          && !compress.equals("snappy")
432      ) {
433        throw new ParseException("Unknown compression scheme: " + compress);
434      }
435
436      if (minKeyLen >= maxKeyLen) {
437        throw new ParseException("Max key length must be greater than min key length.");
438      }
439
440      if (minValLength >= maxValLength) {
441        throw new ParseException("Max value length must be greater than min value length.");
442      }
443
444      if (minWordLen >= maxWordLen) {
445        throw new ParseException("Max word length must be greater than min word length.");
446      }
447      return;
448    }
449
450    private void setStopProceed() {
451      proceed = false;
452    }
453
454    public boolean doCreate() {
455      return (op & OP_CREATE) != 0;
456    }
457
458    public boolean doRead() {
459      return (op & OP_READ) != 0;
460    }
461  }
462
463  public static void main(String[] argv) throws IOException {
464    TestHFileSeek testCase = new TestHFileSeek();
465    MyOptions options = new MyOptions(argv);
466
467    if (options.proceed == false) {
468      return;
469    }
470
471    testCase.options = options;
472    for (int i = 0; i < options.trialCount; i++) {
473      LOG.info("Beginning trial " + (i + 1));
474      testCase.setUp();
475      testCase.testSeeks();
476      testCase.tearDown();
477    }
478  }
479
480}