001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.Random; 023import java.util.StringTokenizer; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FSDataOutputStream; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.fs.RawLocalFileSystem; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 033import org.apache.hadoop.hbase.io.hfile.HFile.Writer; 034import org.apache.hadoop.hbase.testclassification.IOTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.apache.hadoop.hbase.util.RandomDistribution; 037import org.apache.hadoop.io.BytesWritable; 038import org.junit.After; 039import org.junit.Before; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 047import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 048import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 049import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 050import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 051import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionBuilder; 052import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 053import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 054 055/** 056 * test the performance for seek. 057 * <p> 058 * Copied from <a href="https://issues.apache.org/jira/browse/HADOOP-3315">hadoop-3315 tfile</a>. 059 * Remove after tfile is committed and use the tfile version of this class instead. 060 * </p> 061 */ 062@Category({ IOTests.class, SmallTests.class }) 063public class TestHFileSeek { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestHFileSeek.class); 068 069 private static final byte[] CF = "f1".getBytes(); 070 private static final byte[] QUAL = "q1".getBytes(); 071 private static final boolean USE_PREAD = true; 072 private MyOptions options; 073 private Configuration conf; 074 private Path path; 075 private FileSystem fs; 076 private NanoTimer timer; 077 private Random rng; 078 private RandomDistribution.DiscreteRNG keyLenGen; 079 private KVGenerator kvGen; 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestHFileSeek.class); 082 083 @Before 084 public void setUp() throws IOException { 085 if (options == null) { 086 options = new MyOptions(new String[0]); 087 } 088 089 conf = new Configuration(); 090 091 if (options.useRawFs) { 092 conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class); 093 } 094 095 conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize); 096 conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize); 097 path = new Path(new Path(options.rootDir), options.file); 098 fs = path.getFileSystem(conf); 099 timer = new NanoTimer(false); 100 rng = new Random(options.seed); 101 keyLenGen = new RandomDistribution.Zipf(new Random(rng.nextLong()), options.minKeyLen, 102 options.maxKeyLen, 1.2); 103 RandomDistribution.DiscreteRNG valLenGen = new RandomDistribution.Flat( 104 new Random(rng.nextLong()), options.minValLength, options.maxValLength); 105 RandomDistribution.DiscreteRNG wordLenGen = new RandomDistribution.Flat( 106 new Random(rng.nextLong()), options.minWordLen, options.maxWordLen); 107 kvGen = new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen, options.dictSize); 108 } 109 110 @After 111 public void tearDown() { 112 try { 113 fs.close(); 114 } catch (Exception e) { 115 // Nothing 116 } 117 } 118 119 private static FSDataOutputStream createFSOutput(Path name, FileSystem fs) throws IOException { 120 if (fs.exists(name)) { 121 fs.delete(name, true); 122 } 123 FSDataOutputStream fout = fs.create(name); 124 return fout; 125 } 126 127 private void createTFile() throws IOException { 128 long totalBytes = 0; 129 FSDataOutputStream fout = createFSOutput(path, fs); 130 try { 131 HFileContext context = new HFileContextBuilder().withBlockSize(options.minBlockSize) 132 .withCompression(HFileWriterImpl.compressionByName(options.compress)).build(); 133 Writer writer = HFile.getWriterFactoryNoCache(conf).withOutputStream(fout) 134 .withFileContext(context).create(); 135 try { 136 BytesWritable key = new BytesWritable(); 137 BytesWritable val = new BytesWritable(); 138 timer.start(); 139 for (long i = 0; true; ++i) { 140 if (i % 1000 == 0) { // test the size for every 1000 rows. 141 if (fs.getFileStatus(path).getLen() >= options.fileSize) { 142 break; 143 } 144 } 145 kvGen.next(key, val, false); 146 byte[] k = new byte[key.getLength()]; 147 System.arraycopy(key.getBytes(), 0, k, 0, key.getLength()); 148 byte[] v = new byte[val.getLength()]; 149 System.arraycopy(val.getBytes(), 0, v, 0, key.getLength()); 150 KeyValue kv = new KeyValue(k, CF, QUAL, v); 151 writer.append(kv); 152 totalBytes += kv.getKeyLength(); 153 totalBytes += kv.getValueLength(); 154 } 155 timer.stop(); 156 } finally { 157 writer.close(); 158 } 159 } finally { 160 fout.close(); 161 } 162 double duration = (double) timer.read() / 1000; // in us. 163 long fsize = fs.getFileStatus(path).getLen(); 164 165 System.out.printf("time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n", timer.toString(), 166 (double) totalBytes / 1024 / 1024, totalBytes / duration); 167 System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n", timer.toString(), 168 (double) fsize / 1024 / 1024, fsize / duration); 169 } 170 171 public void seekTFile() throws IOException { 172 int miss = 0; 173 long totalBytes = 0; 174 ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, path).build(); 175 Reader reader = TestHFile.createReaderFromStream(context, new CacheConfig(conf), conf); 176 KeySampler kSampler = new KeySampler(rng, ((KeyValue) reader.getFirstKey().get()).getKey(), 177 ((KeyValue) reader.getLastKey().get()).getKey(), keyLenGen); 178 HFileScanner scanner = reader.getScanner(conf, false, USE_PREAD); 179 BytesWritable key = new BytesWritable(); 180 timer.reset(); 181 timer.start(); 182 for (int i = 0; i < options.seekCount; ++i) { 183 kSampler.next(key); 184 byte[] k = new byte[key.getLength()]; 185 System.arraycopy(key.getBytes(), 0, k, 0, key.getLength()); 186 KeyValue kv = new KeyValue(k, CF, QUAL); 187 if (scanner.seekTo(kv) >= 0) { 188 ByteBuffer bbkey = ByteBuffer.wrap(((KeyValue) scanner.getKey()).getKey()); 189 ByteBuffer bbval = scanner.getValue(); 190 totalBytes += bbkey.limit(); 191 totalBytes += bbval.limit(); 192 } else { 193 ++miss; 194 } 195 } 196 timer.stop(); 197 System.out.printf("time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n", 198 timer.toString(), NanoTimer.nanoTimeToString(timer.read() / options.seekCount), 199 options.seekCount - miss, miss, (double) totalBytes / 1024 / (options.seekCount - miss)); 200 201 } 202 203 @Test 204 public void testSeeks() throws IOException { 205 if (options.doCreate()) { 206 createTFile(); 207 } 208 209 if (options.doRead()) { 210 seekTFile(); 211 } 212 213 if (options.doCreate()) { 214 fs.delete(path, true); 215 } 216 } 217 218 private static class IntegerRange { 219 private final int from, to; 220 221 public IntegerRange(int from, int to) { 222 this.from = from; 223 this.to = to; 224 } 225 226 public static IntegerRange parse(String s) throws ParseException { 227 StringTokenizer st = new StringTokenizer(s, " \t,"); 228 if (st.countTokens() != 2) { 229 throw new ParseException("Bad integer specification: " + s); 230 } 231 int from = Integer.parseInt(st.nextToken()); 232 int to = Integer.parseInt(st.nextToken()); 233 return new IntegerRange(from, to); 234 } 235 236 public int from() { 237 return from; 238 } 239 240 public int to() { 241 return to; 242 } 243 } 244 245 private static class MyOptions { 246 // hard coded constants 247 int dictSize = 1000; 248 int minWordLen = 5; 249 int maxWordLen = 20; 250 251 private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 252 String rootDir = TEST_UTIL.getDataTestDir("TestTFileSeek").toString(); 253 String file = "TestTFileSeek"; 254 // String compress = "lzo"; DISABLED 255 String compress = "none"; 256 int minKeyLen = 10; 257 int maxKeyLen = 50; 258 int minValLength = 1024; 259 int maxValLength = 2 * 1024; 260 int minBlockSize = 1 * 1024 * 1024; 261 int fsOutputBufferSize = 1; 262 int fsInputBufferSize = 0; 263 // Default writing 10MB. 264 long fileSize = 10 * 1024 * 1024; 265 long seekCount = 1000; 266 long trialCount = 1; 267 long seed; 268 boolean useRawFs = false; 269 270 static final int OP_CREATE = 1; 271 static final int OP_READ = 2; 272 int op = OP_CREATE | OP_READ; 273 274 boolean proceed = false; 275 276 public MyOptions(String[] args) { 277 seed = System.nanoTime(); 278 279 try { 280 Options opts = buildOptions(); 281 CommandLineParser parser = new GnuParser(); 282 CommandLine line = parser.parse(opts, args, true); 283 processOptions(line, opts); 284 validateOptions(); 285 } catch (ParseException e) { 286 System.out.println(e.getMessage()); 287 System.out.println("Try \"--help\" option for details."); 288 setStopProceed(); 289 } 290 } 291 292 public boolean proceed() { 293 return proceed; 294 } 295 296 private Options buildOptions() { 297 Option compress = OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz|snappy]") 298 .hasArg().withDescription("compression scheme").create('c'); 299 300 Option fileSize = OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB").hasArg() 301 .withDescription("target size of the file (in MB).").create('s'); 302 303 Option fsInputBufferSz = OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size") 304 .hasArg().withDescription("size of the file system input buffer (in bytes).").create('i'); 305 306 Option fsOutputBufferSize = OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size") 307 .hasArg().withDescription("size of the file system output buffer (in bytes).").create('o'); 308 309 Option keyLen = OptionBuilder.withLongOpt("key-length").withArgName("min,max").hasArg() 310 .withDescription("the length range of the key (in bytes)").create('k'); 311 312 Option valueLen = OptionBuilder.withLongOpt("value-length").withArgName("min,max").hasArg() 313 .withDescription("the length range of the value (in bytes)").create('v'); 314 315 Option blockSz = OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg() 316 .withDescription("minimum block size (in KB)").create('b'); 317 318 Option operation = OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg() 319 .withDescription("action: seek-only, create-only, seek-after-create").create('x'); 320 321 Option rootDir = OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg() 322 .withDescription("specify root directory where files will be created.").create('r'); 323 324 Option file = OptionBuilder.withLongOpt("file").withArgName("name").hasArg() 325 .withDescription("specify the file name to be created or read.").create('f'); 326 327 Option seekCount = OptionBuilder.withLongOpt("seek").withArgName("count").hasArg() 328 .withDescription("specify how many seek operations we perform (requires -x r or -x rw.") 329 .create('n'); 330 331 Option trialCount = OptionBuilder.withLongOpt("trials").withArgName("n").hasArg() 332 .withDescription("specify how many times to run the whole benchmark").create('t'); 333 334 Option useRawFs = OptionBuilder.withLongOpt("rawfs") 335 .withDescription("use raw instead of checksummed file system").create(); 336 337 Option help = OptionBuilder.withLongOpt("help").hasArg(false) 338 .withDescription("show this screen").create("h"); 339 340 return new Options().addOption(compress).addOption(fileSize).addOption(fsInputBufferSz) 341 .addOption(fsOutputBufferSize).addOption(keyLen).addOption(blockSz).addOption(rootDir) 342 .addOption(valueLen).addOption(operation).addOption(seekCount).addOption(file) 343 .addOption(trialCount).addOption(useRawFs).addOption(help); 344 345 } 346 347 private void processOptions(CommandLine line, Options opts) throws ParseException { 348 // --help -h and --version -V must be processed first. 349 if (line.hasOption('h')) { 350 HelpFormatter formatter = new HelpFormatter(); 351 System.out.println("TFile and SeqFile benchmark."); 352 System.out.println(); 353 formatter.printHelp(100, "java ... TestTFileSeqFileComparison [options]", 354 "\nSupported options:", opts, ""); 355 return; 356 } 357 358 if (line.hasOption('c')) { 359 compress = line.getOptionValue('c'); 360 } 361 362 if (line.hasOption('d')) { 363 dictSize = Integer.parseInt(line.getOptionValue('d')); 364 } 365 366 if (line.hasOption('s')) { 367 fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024; 368 } 369 370 if (line.hasOption('i')) { 371 fsInputBufferSize = Integer.parseInt(line.getOptionValue('i')); 372 } 373 374 if (line.hasOption('o')) { 375 fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o')); 376 } 377 378 if (line.hasOption('n')) { 379 seekCount = Integer.parseInt(line.getOptionValue('n')); 380 } 381 382 if (line.hasOption('t')) { 383 trialCount = Integer.parseInt(line.getOptionValue('t')); 384 } 385 386 if (line.hasOption('k')) { 387 IntegerRange ir = IntegerRange.parse(line.getOptionValue('k')); 388 minKeyLen = ir.from(); 389 maxKeyLen = ir.to(); 390 } 391 392 if (line.hasOption('v')) { 393 IntegerRange ir = IntegerRange.parse(line.getOptionValue('v')); 394 minValLength = ir.from(); 395 maxValLength = ir.to(); 396 } 397 398 if (line.hasOption('b')) { 399 minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024; 400 } 401 402 if (line.hasOption('r')) { 403 rootDir = line.getOptionValue('r'); 404 } 405 406 if (line.hasOption('f')) { 407 file = line.getOptionValue('f'); 408 } 409 410 if (line.hasOption('S')) { 411 seed = Long.parseLong(line.getOptionValue('S')); 412 } 413 414 if (line.hasOption('x')) { 415 String strOp = line.getOptionValue('x'); 416 if (strOp.equals("r")) { 417 op = OP_READ; 418 } else if (strOp.equals("w")) { 419 op = OP_CREATE; 420 } else if (strOp.equals("rw")) { 421 op = OP_CREATE | OP_READ; 422 } else { 423 throw new ParseException("Unknown action specifier: " + strOp); 424 } 425 } 426 427 useRawFs = line.hasOption("rawfs"); 428 429 proceed = true; 430 } 431 432 private void validateOptions() throws ParseException { 433 if ( 434 !compress.equals("none") && !compress.equals("lzo") && !compress.equals("gz") 435 && !compress.equals("snappy") 436 ) { 437 throw new ParseException("Unknown compression scheme: " + compress); 438 } 439 440 if (minKeyLen >= maxKeyLen) { 441 throw new ParseException("Max key length must be greater than min key length."); 442 } 443 444 if (minValLength >= maxValLength) { 445 throw new ParseException("Max value length must be greater than min value length."); 446 } 447 448 if (minWordLen >= maxWordLen) { 449 throw new ParseException("Max word length must be greater than min word length."); 450 } 451 return; 452 } 453 454 private void setStopProceed() { 455 proceed = false; 456 } 457 458 public boolean doCreate() { 459 return (op & OP_CREATE) != 0; 460 } 461 462 public boolean doRead() { 463 return (op & OP_READ) != 0; 464 } 465 } 466 467 public static void main(String[] argv) throws IOException { 468 TestHFileSeek testCase = new TestHFileSeek(); 469 MyOptions options = new MyOptions(argv); 470 471 if (options.proceed == false) { 472 return; 473 } 474 475 testCase.options = options; 476 for (int i = 0; i < options.trialCount; i++) { 477 LOG.info("Beginning trial " + (i + 1)); 478 testCase.setUp(); 479 testCase.testSeeks(); 480 testCase.tearDown(); 481 } 482 } 483 484}