001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.security.SecureRandom;
023import java.util.Random;
024
025import org.apache.commons.math3.random.RandomData;
026import org.apache.commons.math3.random.RandomDataImpl;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
034import org.apache.hadoop.hbase.io.crypto.CryptoCipherProvider;
035import org.apache.hadoop.hbase.io.crypto.DefaultCipherProvider;
036import org.apache.hadoop.hbase.io.crypto.Encryption;
037import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
038import org.apache.hadoop.hbase.io.crypto.aes.AES;
039import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
040import org.apache.hadoop.hbase.io.hfile.CacheConfig;
041import org.apache.hadoop.hbase.io.hfile.HFile;
042import org.apache.hadoop.hbase.io.hfile.HFileContext;
043import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
044import org.apache.hadoop.hbase.io.hfile.HFileScanner;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047
048/**
049 * This class runs performance benchmarks for {@link HFile}.
050 */
051@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
052public class HFilePerformanceEvaluation {
053  private static final int ROW_LENGTH = 10;
054  private static final int ROW_COUNT = 1000000;
055  private static final int RFILE_BLOCKSIZE = 8 * 1024;
056  private static StringBuilder testSummary = new StringBuilder();
057  
058  // Disable verbose INFO logging from org.apache.hadoop.io.compress.CodecPool
059  static {
060    System.setProperty("org.apache.commons.logging.Log", 
061      "org.apache.commons.logging.impl.SimpleLog");
062    System.setProperty("org.apache.commons.logging.simplelog.log.org.apache.hadoop.io.compress.CodecPool",
063      "WARN");
064  }
065  
066  private static final Logger LOG =
067    LoggerFactory.getLogger(HFilePerformanceEvaluation.class.getName());
068
069  static byte [] format(final int i) {
070    String v = Integer.toString(i);
071    return Bytes.toBytes("0000000000".substring(v.length()) + v);
072  }
073
074  static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
075    w.set(format(i));
076    return w;
077  }
078
079  static Cell createCell(final int i) {
080    return createCell(i, HConstants.EMPTY_BYTE_ARRAY);
081  }
082
083  /**
084   * HFile is Cell-based. It used to be byte arrays.  Doing this test, pass Cells. All Cells
085   * intentionally have same coordinates in all fields but row.
086   * @param i Integer to format as a row Key.
087   * @param value Value to use
088   * @return Created Cell.
089   */
090  static Cell createCell(final int i, final byte [] value) {
091    return createCell(format(i), value);
092  }
093
094  static Cell createCell(final byte [] keyRow) {
095    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
096      .setRow(keyRow)
097      .setFamily(HConstants.EMPTY_BYTE_ARRAY)
098      .setQualifier(HConstants.EMPTY_BYTE_ARRAY)
099      .setTimestamp(HConstants.LATEST_TIMESTAMP)
100      .setType(KeyValue.Type.Maximum.getCode())
101      .setValue(HConstants.EMPTY_BYTE_ARRAY)
102      .build();
103  }
104
105  static Cell createCell(final byte [] keyRow, final byte [] value) {
106    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
107      .setRow(keyRow)
108      .setFamily(HConstants.EMPTY_BYTE_ARRAY)
109      .setQualifier(HConstants.EMPTY_BYTE_ARRAY)
110      .setTimestamp(HConstants.LATEST_TIMESTAMP)
111      .setType(KeyValue.Type.Maximum.getCode())
112      .setValue(value)
113      .build();
114  }
115
116  /**
117   * Add any supported codec or cipher to test the HFile read/write performance. 
118   * Specify "none" to disable codec or cipher or both.  
119   * @throws Exception
120   */
121  private void runBenchmarks() throws Exception {
122    final Configuration conf = new Configuration();
123    final FileSystem fs = FileSystem.get(conf);
124    final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
125    
126    // codec=none cipher=none
127    runWriteBenchmark(conf, fs, mf, "none", "none");
128    runReadBenchmark(conf, fs, mf, "none", "none");
129    
130    // codec=gz cipher=none
131    runWriteBenchmark(conf, fs, mf, "gz", "none");
132    runReadBenchmark(conf, fs, mf, "gz", "none");
133
134    // Add configuration for AES cipher
135    final Configuration aesconf = new Configuration();
136    aesconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
137    aesconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
138    aesconf.setInt("hfile.format.version", 3);
139    final FileSystem aesfs = FileSystem.get(aesconf);
140    final Path aesmf = aesfs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
141
142    // codec=none cipher=aes
143    runWriteBenchmark(aesconf, aesfs, aesmf, "none", "aes");
144    runReadBenchmark(aesconf, aesfs, aesmf, "none", "aes");
145
146    // codec=gz cipher=aes
147    runWriteBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
148    runReadBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
149
150    // Add configuration for Commons cipher
151    final Configuration cryptoconf = new Configuration();
152    cryptoconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
153    cryptoconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
154    cryptoconf.setInt("hfile.format.version", 3);
155    cryptoconf.set(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY, CryptoCipherProvider.class.getName());
156    final FileSystem cryptofs = FileSystem.get(cryptoconf);
157    final Path cryptof = cryptofs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
158
159    // codec=none cipher=aes
160    runWriteBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
161    runReadBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
162
163    // codec=gz cipher=aes
164    runWriteBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
165    runReadBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
166
167    // cleanup test files
168    if (fs.exists(mf)) {
169      fs.delete(mf, true);
170    }
171    if (aesfs.exists(aesmf)) {
172      aesfs.delete(aesmf, true);
173    }
174    if (cryptofs.exists(aesmf)) {
175      cryptofs.delete(cryptof, true);
176    }
177
178    // Print Result Summary
179    LOG.info("\n***************\n" + "Result Summary" + "\n***************\n");
180    LOG.info(testSummary.toString());
181
182  }
183
184  /**
185   * Write a test HFile with the given codec & cipher
186   * @param conf
187   * @param fs
188   * @param mf
189   * @param codec "none", "lzo", "gz", "snappy"
190   * @param cipher "none", "aes"
191   * @throws Exception
192   */
193  private void runWriteBenchmark(Configuration conf, FileSystem fs, Path mf, String codec,
194      String cipher) throws Exception {
195    if (fs.exists(mf)) {
196      fs.delete(mf, true);
197    }
198
199    runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT, codec, cipher),
200        ROW_COUNT, codec, getCipherName(conf, cipher));
201
202  }
203
204  /**
205   * Run all the read benchmarks for the test HFile 
206   * @param conf
207   * @param fs
208   * @param mf
209   * @param codec "none", "lzo", "gz", "snappy"
210   * @param cipher "none", "aes"
211   */
212  private void runReadBenchmark(final Configuration conf, final FileSystem fs, final Path mf,
213      final String codec, final String cipher) {
214    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
215      @Override
216      public void run() {
217        try {
218          runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT),
219            ROW_COUNT, codec, getCipherName(conf, cipher));
220        } catch (Exception e) {
221          testSummary.append("UniformRandomSmallScan failed " + e.getMessage());
222          e.printStackTrace();
223        }
224      }
225    });
226    
227    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
228      @Override
229      public void run() {
230        try {
231          runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
232              ROW_COUNT, codec, getCipherName(conf, cipher));
233        } catch (Exception e) {
234          testSummary.append("UniformRandomReadBenchmark failed " + e.getMessage());
235          e.printStackTrace();
236        }
237      }
238    });
239    
240    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
241      @Override
242      public void run() {
243        try {
244          runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
245              ROW_COUNT, codec, getCipherName(conf, cipher));
246        } catch (Exception e) {
247          testSummary.append("GaussianRandomReadBenchmark failed " + e.getMessage());
248          e.printStackTrace();
249        }
250      }
251    });
252    
253    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
254      @Override
255      public void run() {
256        try {
257          runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT),
258              ROW_COUNT, codec, getCipherName(conf, cipher));
259        } catch (Exception e) {
260          testSummary.append("SequentialReadBenchmark failed " + e.getMessage());
261          e.printStackTrace();
262        }
263      }
264    });    
265
266  }
267  
268  protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount,
269      String codec, String cipher) throws Exception {
270    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + 
271        codec + "] " + "cipher[" + cipher + "] for " + rowCount + " rows.");
272    
273    long elapsedTime = benchmark.run();
274    
275    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + 
276        codec + "] " + "cipher[" + cipher + "] for " + rowCount + " rows took " + 
277        elapsedTime + "ms.");
278    
279    // Store results to print summary at the end
280    testSummary.append("Running ").append(benchmark.getClass().getSimpleName())
281        .append(" with codec[").append(codec).append("] cipher[").append(cipher)
282        .append("] for ").append(rowCount).append(" rows took ").append(elapsedTime)
283        .append("ms.").append("\n");
284  }
285
286  static abstract class RowOrientedBenchmark {
287
288    protected final Configuration conf;
289    protected final FileSystem fs;
290    protected final Path mf;
291    protected final int totalRows;
292    protected String codec = "none";
293    protected String cipher = "none";
294
295    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
296        int totalRows, String codec, String cipher) {
297      this.conf = conf;
298      this.fs = fs;
299      this.mf = mf;
300      this.totalRows = totalRows;
301      this.codec = codec;
302      this.cipher = cipher;
303    }
304
305    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
306        int totalRows) {
307      this.conf = conf;
308      this.fs = fs;
309      this.mf = mf;
310      this.totalRows = totalRows;
311    }
312
313    void setUp() throws Exception {
314      // do nothing
315    }
316
317    abstract void doRow(int i) throws Exception;
318
319    protected int getReportingPeriod() {
320      return this.totalRows / 10;
321    }
322
323    void tearDown() throws Exception {
324      // do nothing
325    }
326
327    /**
328     * Run benchmark
329     * @return elapsed time.
330     * @throws Exception
331     */
332    long run() throws Exception {
333      long elapsedTime;
334      setUp();
335      long startTime = EnvironmentEdgeManager.currentTime();
336      try {
337        for (int i = 0; i < totalRows; i++) {
338          if (i > 0 && i % getReportingPeriod() == 0) {
339            LOG.info("Processed " + i + " rows.");
340          }
341          doRow(i);
342        }
343        elapsedTime = EnvironmentEdgeManager.currentTime() - startTime;
344      } finally {
345        tearDown();
346      }
347      return elapsedTime;
348    }
349
350  }
351
352  static class SequentialWriteBenchmark extends RowOrientedBenchmark {
353    protected HFile.Writer writer;
354    private Random random = new Random();
355    private byte[] bytes = new byte[ROW_LENGTH];
356
357    public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf,
358        int totalRows, String codec, String cipher) {
359      super(conf, fs, mf, totalRows, codec, cipher);
360    }
361
362    @Override
363    void setUp() throws Exception {
364
365      HFileContextBuilder builder = new HFileContextBuilder()
366          .withCompression(HFileWriterImpl.compressionByName(codec))
367          .withBlockSize(RFILE_BLOCKSIZE);
368      
369      if (cipher == "aes") {
370        byte[] cipherKey = new byte[AES.KEY_LENGTH];
371        new SecureRandom().nextBytes(cipherKey);
372        builder.withEncryptionContext(Encryption.newContext(conf)
373            .setCipher(Encryption.getCipher(conf, cipher))
374            .setKey(cipherKey));
375      } else if (!"none".equals(cipher)) {
376        throw new IOException("Cipher " + cipher + " not supported.");
377      }
378      
379      HFileContext hFileContext = builder.build();
380
381      writer = HFile.getWriterFactoryNoCache(conf)
382          .withPath(fs, mf)
383          .withFileContext(hFileContext)
384          .create();
385    }
386    
387    @Override
388    void doRow(int i) throws Exception {
389      writer.append(createCell(i, generateValue()));
390    }
391
392    private byte[] generateValue() {
393      random.nextBytes(bytes);
394      return bytes;
395    }
396
397    @Override
398    protected int getReportingPeriod() {
399      return this.totalRows; // don't report progress
400    }
401
402    @Override
403    void tearDown() throws Exception {
404      writer.close();
405    }
406
407  }
408
409  static abstract class ReadBenchmark extends RowOrientedBenchmark {
410
411    protected HFile.Reader reader;
412
413    public ReadBenchmark(Configuration conf, FileSystem fs, Path mf,
414        int totalRows) {
415      super(conf, fs, mf, totalRows);
416    }
417
418    @Override
419    void setUp() throws Exception {
420      reader = HFile.createReader(this.fs, this.mf, new CacheConfig(this.conf), true, this.conf);
421    }
422
423    @Override
424    void tearDown() throws Exception {
425      reader.close();
426    }
427
428  }
429
430  static class SequentialReadBenchmark extends ReadBenchmark {
431    private HFileScanner scanner;
432
433    public SequentialReadBenchmark(Configuration conf, FileSystem fs,
434      Path mf, int totalRows) {
435      super(conf, fs, mf, totalRows);
436    }
437
438    @Override
439    void setUp() throws Exception {
440      super.setUp();
441      this.scanner = this.reader.getScanner(conf, false, false);
442      this.scanner.seekTo();
443    }
444
445    @Override
446    void doRow(int i) throws Exception {
447      if (this.scanner.next()) {
448        // TODO: Fix. Make Scanner do Cells.
449        Cell c = this.scanner.getCell();
450        PerformanceEvaluationCommons.assertKey(format(i + 1), c);
451        PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
452      }
453    }
454
455    @Override
456    protected int getReportingPeriod() {
457      return this.totalRows; // don't report progress
458    }
459
460  }
461
462  static class UniformRandomReadBenchmark extends ReadBenchmark {
463
464    private Random random = new Random();
465
466    public UniformRandomReadBenchmark(Configuration conf, FileSystem fs,
467        Path mf, int totalRows) {
468      super(conf, fs, mf, totalRows);
469    }
470
471    @Override
472    void doRow(int i) throws Exception {
473      HFileScanner scanner = this.reader.getScanner(conf, false, true);
474      byte [] b = getRandomRow();
475      if (scanner.seekTo(createCell(b)) < 0) {
476        LOG.info("Not able to seekTo " + new String(b));
477        return;
478      }
479      // TODO: Fix scanner so it does Cells
480      Cell c = scanner.getCell();
481      PerformanceEvaluationCommons.assertKey(b, c);
482      PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
483    }
484
485    private byte [] getRandomRow() {
486      return format(random.nextInt(totalRows));
487    }
488  }
489
490  static class UniformRandomSmallScan extends ReadBenchmark {
491    private Random random = new Random();
492
493    public UniformRandomSmallScan(Configuration conf, FileSystem fs,
494        Path mf, int totalRows) {
495      super(conf, fs, mf, totalRows/10);
496    }
497
498    @Override
499    void doRow(int i) throws Exception {
500      HFileScanner scanner = this.reader.getScanner(conf, false, false);
501      byte [] b = getRandomRow();
502      // System.out.println("Random row: " + new String(b));
503      Cell c = createCell(b);
504      if (scanner.seekTo(c) != 0) {
505        LOG.info("Nonexistent row: " + new String(b));
506        return;
507      }
508      // TODO: HFileScanner doesn't do Cells yet. Temporary fix.
509      c = scanner.getCell();
510      // System.out.println("Found row: " +
511      //  new String(c.getRowArray(), c.getRowOffset(), c.getRowLength()));
512      PerformanceEvaluationCommons.assertKey(b, c);
513      for (int ii = 0; ii < 30; ii++) {
514        if (!scanner.next()) {
515          LOG.info("NOTHING FOLLOWS");
516          return;
517        }
518        c = scanner.getCell();
519        PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
520      }
521    }
522
523    private byte [] getRandomRow() {
524      return format(random.nextInt(totalRows));
525    }
526  }
527
528  static class GaussianRandomReadBenchmark extends ReadBenchmark {
529
530    private RandomData randomData = new RandomDataImpl();
531
532    public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs,
533        Path mf, int totalRows) {
534      super(conf, fs, mf, totalRows);
535    }
536
537    @Override
538    void doRow(int i) throws Exception {
539      HFileScanner scanner = this.reader.getScanner(conf, false, true);
540      byte[] gaussianRandomRowBytes = getGaussianRandomRowBytes();
541      scanner.seekTo(createCell(gaussianRandomRowBytes));
542      for (int ii = 0; ii < 30; ii++) {
543        if (!scanner.next()) {
544          LOG.info("NOTHING FOLLOWS");
545          return;
546        }
547        // TODO: Fix. Make scanner do Cells.
548        scanner.getCell();
549      }
550    }
551
552    private byte [] getGaussianRandomRowBytes() {
553      int r = (int) randomData.nextGaussian((double)totalRows / 2.0,
554          (double)totalRows / 10.0);
555      // make sure r falls into [0,totalRows)
556      return format(Math.min(totalRows, Math.max(r,0)));
557    }
558  }
559
560  /**
561   * @param args
562   * @throws Exception
563   * @throws IOException
564   */
565  public static void main(String[] args) throws Exception {
566    new HFilePerformanceEvaluation().runBenchmarks();
567  }
568
569  private String getCipherName(Configuration conf, String cipherName) {
570    if (cipherName.equals("aes")) {
571      String provider = conf.get(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY);
572      if (provider == null || provider.equals("")
573              || provider.equals(DefaultCipherProvider.class.getName())) {
574        return "aes-default";
575      } else if (provider.equals(CryptoCipherProvider.class.getName())) {
576        return "aes-commons";
577      }
578    }
579    return cipherName;
580  }
581}