001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.concurrent.ThreadLocalRandom;
022import org.apache.commons.math3.random.RandomData;
023import org.apache.commons.math3.random.RandomDataImpl;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
028import org.apache.hadoop.hbase.io.crypto.CryptoCipherProvider;
029import org.apache.hadoop.hbase.io.crypto.DefaultCipherProvider;
030import org.apache.hadoop.hbase.io.crypto.Encryption;
031import org.apache.hadoop.hbase.io.crypto.MockAesKeyProvider;
032import org.apache.hadoop.hbase.io.crypto.aes.AES;
033import org.apache.hadoop.hbase.io.hfile.CacheConfig;
034import org.apache.hadoop.hbase.io.hfile.HFile;
035import org.apache.hadoop.hbase.io.hfile.HFileContext;
036import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
037import org.apache.hadoop.hbase.io.hfile.HFileScanner;
038import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * This class runs performance benchmarks for {@link HFile}.
047 */
048@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
049public class HFilePerformanceEvaluation {
050  private static final int ROW_LENGTH = 10;
051  private static final int ROW_COUNT = 1000000;
052  private static final int RFILE_BLOCKSIZE = 8 * 1024;
053  private static StringBuilder testSummary = new StringBuilder();
054
055  // Disable verbose INFO logging from org.apache.hadoop.io.compress.CodecPool
056  static {
057    System.setProperty("org.apache.commons.logging.Log",
058      "org.apache.commons.logging.impl.SimpleLog");
059    System.setProperty(
060      "org.apache.commons.logging.simplelog.log.org.apache.hadoop.io.compress.CodecPool", "WARN");
061  }
062
063  private static final Logger LOG =
064    LoggerFactory.getLogger(HFilePerformanceEvaluation.class.getName());
065
066  static byte[] format(final int i) {
067    String v = Integer.toString(i);
068    return Bytes.toBytes("0000000000".substring(v.length()) + v);
069  }
070
071  static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
072    w.set(format(i));
073    return w;
074  }
075
076  static ExtendedCell createCell(final int i) {
077    return createCell(i, HConstants.EMPTY_BYTE_ARRAY);
078  }
079
080  /**
081   * HFile is Cell-based. It used to be byte arrays. Doing this test, pass Cells. All Cells
082   * intentionally have same coordinates in all fields but row.
083   * @param i     Integer to format as a row Key.
084   * @param value Value to use
085   * @return Created Cell.
086   */
087  static ExtendedCell createCell(final int i, final byte[] value) {
088    return createCell(format(i), value);
089  }
090
091  static ExtendedCell createCell(final byte[] keyRow) {
092    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(keyRow)
093      .setFamily(HConstants.EMPTY_BYTE_ARRAY).setQualifier(HConstants.EMPTY_BYTE_ARRAY)
094      .setTimestamp(HConstants.LATEST_TIMESTAMP).setType(KeyValue.Type.Maximum.getCode())
095      .setValue(HConstants.EMPTY_BYTE_ARRAY).build();
096  }
097
098  static ExtendedCell createCell(final byte[] keyRow, final byte[] value) {
099    return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(keyRow)
100      .setFamily(HConstants.EMPTY_BYTE_ARRAY).setQualifier(HConstants.EMPTY_BYTE_ARRAY)
101      .setTimestamp(HConstants.LATEST_TIMESTAMP).setType(KeyValue.Type.Maximum.getCode())
102      .setValue(value).build();
103  }
104
105  /**
106   * Add any supported codec or cipher to test the HFile read/write performance. Specify "none" to
107   * disable codec or cipher or both.
108   */
109  private void runBenchmarks() throws Exception {
110    final Configuration conf = new Configuration();
111    final FileSystem fs = FileSystem.get(conf);
112    final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
113
114    // codec=none cipher=none
115    runWriteBenchmark(conf, fs, mf, "none", "none");
116    runReadBenchmark(conf, fs, mf, "none", "none");
117
118    // codec=gz cipher=none
119    runWriteBenchmark(conf, fs, mf, "gz", "none");
120    runReadBenchmark(conf, fs, mf, "gz", "none");
121
122    // Add configuration for AES cipher
123    final Configuration aesconf = new Configuration();
124    aesconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, MockAesKeyProvider.class.getName());
125    aesconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
126    aesconf.setInt("hfile.format.version", 3);
127    final FileSystem aesfs = FileSystem.get(aesconf);
128    final Path aesmf = aesfs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
129
130    // codec=none cipher=aes
131    runWriteBenchmark(aesconf, aesfs, aesmf, "none", "aes");
132    runReadBenchmark(aesconf, aesfs, aesmf, "none", "aes");
133
134    // codec=gz cipher=aes
135    runWriteBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
136    runReadBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
137
138    // Add configuration for Commons cipher
139    final Configuration cryptoconf = new Configuration();
140    cryptoconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, MockAesKeyProvider.class.getName());
141    cryptoconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
142    cryptoconf.setInt("hfile.format.version", 3);
143    cryptoconf.set(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY, CryptoCipherProvider.class.getName());
144    final FileSystem cryptofs = FileSystem.get(cryptoconf);
145    final Path cryptof = cryptofs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
146
147    // codec=none cipher=aes
148    runWriteBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
149    runReadBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
150
151    // codec=gz cipher=aes
152    runWriteBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
153    runReadBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
154
155    // cleanup test files
156    if (fs.exists(mf)) {
157      fs.delete(mf, true);
158    }
159    if (aesfs.exists(aesmf)) {
160      aesfs.delete(aesmf, true);
161    }
162    if (cryptofs.exists(aesmf)) {
163      cryptofs.delete(cryptof, true);
164    }
165
166    // Print Result Summary
167    LOG.info("\n***************\n" + "Result Summary" + "\n***************\n");
168    LOG.info(testSummary.toString());
169
170  }
171
172  /**
173   * Write a test HFile with the given codec & cipher
174   * @param codec  "none", "lzo", "gz", "snappy"
175   * @param cipher "none", "aes"
176   */
177  private void runWriteBenchmark(Configuration conf, FileSystem fs, Path mf, String codec,
178    String cipher) throws Exception {
179    if (fs.exists(mf)) {
180      fs.delete(mf, true);
181    }
182
183    runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT, codec, cipher), ROW_COUNT,
184      codec, getCipherName(conf, cipher));
185
186  }
187
188  /**
189   * Run all the read benchmarks for the test HFile
190   * @param codec  "none", "lzo", "gz", "snappy"
191   * @param cipher "none", "aes"
192   */
193  private void runReadBenchmark(final Configuration conf, final FileSystem fs, final Path mf,
194    final String codec, final String cipher) {
195    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
196      @Override
197      public void run() {
198        try {
199          runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
200            getCipherName(conf, cipher));
201        } catch (Exception e) {
202          testSummary.append("UniformRandomSmallScan failed " + e.getMessage());
203          e.printStackTrace();
204        }
205      }
206    });
207
208    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
209      @Override
210      public void run() {
211        try {
212          runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
213            getCipherName(conf, cipher));
214        } catch (Exception e) {
215          testSummary.append("UniformRandomReadBenchmark failed " + e.getMessage());
216          e.printStackTrace();
217        }
218      }
219    });
220
221    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
222      @Override
223      public void run() {
224        try {
225          runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
226            getCipherName(conf, cipher));
227        } catch (Exception e) {
228          testSummary.append("GaussianRandomReadBenchmark failed " + e.getMessage());
229          e.printStackTrace();
230        }
231      }
232    });
233
234    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
235      @Override
236      public void run() {
237        try {
238          runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
239            getCipherName(conf, cipher));
240        } catch (Exception e) {
241          testSummary.append("SequentialReadBenchmark failed " + e.getMessage());
242          e.printStackTrace();
243        }
244      }
245    });
246
247  }
248
249  protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount, String codec,
250    String cipher) throws Exception {
251    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + codec + "] "
252      + "cipher[" + cipher + "] for " + rowCount + " rows.");
253
254    long elapsedTime = benchmark.run();
255
256    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + codec + "] "
257      + "cipher[" + cipher + "] for " + rowCount + " rows took " + elapsedTime + "ms.");
258
259    // Store results to print summary at the end
260    testSummary.append("Running ").append(benchmark.getClass().getSimpleName())
261      .append(" with codec[").append(codec).append("] cipher[").append(cipher).append("] for ")
262      .append(rowCount).append(" rows took ").append(elapsedTime).append("ms.").append("\n");
263  }
264
265  static abstract class RowOrientedBenchmark {
266
267    protected final Configuration conf;
268    protected final FileSystem fs;
269    protected final Path mf;
270    protected final int totalRows;
271    protected String codec = "none";
272    protected String cipher = "none";
273
274    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows,
275      String codec, String cipher) {
276      this.conf = conf;
277      this.fs = fs;
278      this.mf = mf;
279      this.totalRows = totalRows;
280      this.codec = codec;
281      this.cipher = cipher;
282    }
283
284    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
285      this.conf = conf;
286      this.fs = fs;
287      this.mf = mf;
288      this.totalRows = totalRows;
289    }
290
291    void setUp() throws Exception {
292      // do nothing
293    }
294
295    abstract void doRow(int i) throws Exception;
296
297    protected int getReportingPeriod() {
298      return this.totalRows / 10;
299    }
300
301    void tearDown() throws Exception {
302      // do nothing
303    }
304
305    /**
306     * Run benchmark
307     * @return elapsed time.
308     */
309    long run() throws Exception {
310      long elapsedTime;
311      setUp();
312      long startTime = EnvironmentEdgeManager.currentTime();
313      try {
314        for (int i = 0; i < totalRows; i++) {
315          if (i > 0 && i % getReportingPeriod() == 0) {
316            LOG.info("Processed " + i + " rows.");
317          }
318          doRow(i);
319        }
320        elapsedTime = EnvironmentEdgeManager.currentTime() - startTime;
321      } finally {
322        tearDown();
323      }
324      return elapsedTime;
325    }
326
327  }
328
329  static class SequentialWriteBenchmark extends RowOrientedBenchmark {
330    protected HFile.Writer writer;
331    private byte[] bytes = new byte[ROW_LENGTH];
332
333    public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows,
334      String codec, String cipher) {
335      super(conf, fs, mf, totalRows, codec, cipher);
336    }
337
338    @Override
339    void setUp() throws Exception {
340
341      HFileContextBuilder builder = new HFileContextBuilder()
342        .withCompression(HFileWriterImpl.compressionByName(codec)).withBlockSize(RFILE_BLOCKSIZE);
343
344      if (cipher == "aes") {
345        byte[] cipherKey = new byte[AES.KEY_LENGTH];
346        Bytes.secureRandom(cipherKey);
347        builder.withEncryptionContext(Encryption.newContext(conf)
348          .setCipher(Encryption.getCipher(conf, cipher)).setKey(cipherKey));
349      } else if (!"none".equals(cipher)) {
350        throw new IOException("Cipher " + cipher + " not supported.");
351      }
352
353      HFileContext hFileContext = builder.build();
354
355      writer =
356        HFile.getWriterFactoryNoCache(conf).withPath(fs, mf).withFileContext(hFileContext).create();
357    }
358
359    @Override
360    void doRow(int i) throws Exception {
361      writer.append(createCell(i, generateValue()));
362    }
363
364    private byte[] generateValue() {
365      Bytes.random(bytes);
366      return bytes;
367    }
368
369    @Override
370    protected int getReportingPeriod() {
371      return this.totalRows; // don't report progress
372    }
373
374    @Override
375    void tearDown() throws Exception {
376      writer.close();
377    }
378
379  }
380
381  static abstract class ReadBenchmark extends RowOrientedBenchmark {
382
383    protected HFile.Reader reader;
384
385    public ReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
386      super(conf, fs, mf, totalRows);
387    }
388
389    @Override
390    void setUp() throws Exception {
391      reader = HFile.createReader(this.fs, this.mf, new CacheConfig(this.conf), true, this.conf);
392    }
393
394    @Override
395    void tearDown() throws Exception {
396      reader.close();
397    }
398
399  }
400
401  static class SequentialReadBenchmark extends ReadBenchmark {
402    private HFileScanner scanner;
403
404    public SequentialReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
405      super(conf, fs, mf, totalRows);
406    }
407
408    @Override
409    void setUp() throws Exception {
410      super.setUp();
411      this.scanner = this.reader.getScanner(conf, false, false);
412      this.scanner.seekTo();
413    }
414
415    @Override
416    void doRow(int i) throws Exception {
417      if (this.scanner.next()) {
418        // TODO: Fix. Make Scanner do Cells.
419        Cell c = this.scanner.getCell();
420        PerformanceEvaluationCommons.assertKey(format(i + 1), c);
421        PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
422      }
423    }
424
425    @Override
426    protected int getReportingPeriod() {
427      return this.totalRows; // don't report progress
428    }
429
430  }
431
432  static class UniformRandomReadBenchmark extends ReadBenchmark {
433
434    public UniformRandomReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
435      super(conf, fs, mf, totalRows);
436    }
437
438    @Override
439    void doRow(int i) throws Exception {
440      HFileScanner scanner = this.reader.getScanner(conf, false, true);
441      byte[] b = getRandomRow();
442      if (scanner.seekTo(createCell(b)) < 0) {
443        LOG.info("Not able to seekTo " + new String(b));
444        return;
445      }
446      // TODO: Fix scanner so it does Cells
447      Cell c = scanner.getCell();
448      PerformanceEvaluationCommons.assertKey(b, c);
449      PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
450    }
451
452    private byte[] getRandomRow() {
453      return format(ThreadLocalRandom.current().nextInt(totalRows));
454    }
455  }
456
457  static class UniformRandomSmallScan extends ReadBenchmark {
458
459    public UniformRandomSmallScan(Configuration conf, FileSystem fs, Path mf, int totalRows) {
460      super(conf, fs, mf, totalRows / 10);
461    }
462
463    @Override
464    void doRow(int i) throws Exception {
465      HFileScanner scanner = this.reader.getScanner(conf, false, false);
466      byte[] b = getRandomRow();
467      // System.out.println("Random row: " + new String(b));
468      ExtendedCell c = createCell(b);
469      if (scanner.seekTo(c) != 0) {
470        LOG.info("Nonexistent row: " + new String(b));
471        return;
472      }
473      // TODO: HFileScanner doesn't do Cells yet. Temporary fix.
474      c = scanner.getCell();
475      // System.out.println("Found row: " +
476      // new String(c.getRowArray(), c.getRowOffset(), c.getRowLength()));
477      PerformanceEvaluationCommons.assertKey(b, c);
478      for (int ii = 0; ii < 30; ii++) {
479        if (!scanner.next()) {
480          LOG.info("NOTHING FOLLOWS");
481          return;
482        }
483        c = scanner.getCell();
484        PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
485      }
486    }
487
488    private byte[] getRandomRow() {
489      return format(ThreadLocalRandom.current().nextInt(totalRows));
490    }
491  }
492
493  static class GaussianRandomReadBenchmark extends ReadBenchmark {
494
495    private RandomData randomData = new RandomDataImpl();
496
497    public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
498      super(conf, fs, mf, totalRows);
499    }
500
501    @Override
502    void doRow(int i) throws Exception {
503      HFileScanner scanner = this.reader.getScanner(conf, false, true);
504      byte[] gaussianRandomRowBytes = getGaussianRandomRowBytes();
505      scanner.seekTo(createCell(gaussianRandomRowBytes));
506      for (int ii = 0; ii < 30; ii++) {
507        if (!scanner.next()) {
508          LOG.info("NOTHING FOLLOWS");
509          return;
510        }
511        // TODO: Fix. Make scanner do Cells.
512        scanner.getCell();
513      }
514    }
515
516    private byte[] getGaussianRandomRowBytes() {
517      int r = (int) randomData.nextGaussian((double) totalRows / 2.0, (double) totalRows / 10.0);
518      // make sure r falls into [0,totalRows)
519      return format(Math.min(totalRows, Math.max(r, 0)));
520    }
521  }
522
523  /**
524   *   */
525  public static void main(String[] args) throws Exception {
526    new HFilePerformanceEvaluation().runBenchmarks();
527  }
528
529  private String getCipherName(Configuration conf, String cipherName) {
530    if (cipherName.equals("aes")) {
531      String provider = conf.get(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY);
532      if (
533        provider == null || provider.equals("")
534          || provider.equals(DefaultCipherProvider.class.getName())
535      ) {
536        return "aes-default";
537      } else if (provider.equals(CryptoCipherProvider.class.getName())) {
538        return "aes-commons";
539      }
540    }
541    return cipherName;
542  }
543}