001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.concurrent.ThreadLocalRandom;
022import org.apache.commons.math3.random.RandomData;
023import org.apache.commons.math3.random.RandomDataImpl;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
028import org.apache.hadoop.hbase.io.crypto.CryptoCipherProvider;
029import org.apache.hadoop.hbase.io.crypto.DefaultCipherProvider;
030import org.apache.hadoop.hbase.io.crypto.Encryption;
031import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
032import org.apache.hadoop.hbase.io.crypto.aes.AES;
033import org.apache.hadoop.hbase.io.hfile.CacheConfig;
034import org.apache.hadoop.hbase.io.hfile.HFile;
035import org.apache.hadoop.hbase.io.hfile.HFileContext;
036import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
037import org.apache.hadoop.hbase.io.hfile.HFileScanner;
038import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * This class runs performance benchmarks for {@link HFile}.
047 */
048@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
049public class HFilePerformanceEvaluation {
050  private static final int ROW_LENGTH = 10;
051  private static final int ROW_COUNT = 1000000;
052  private static final int RFILE_BLOCKSIZE = 8 * 1024;
053  private static StringBuilder testSummary = new StringBuilder();
054
055  // Disable verbose INFO logging from org.apache.hadoop.io.compress.CodecPool
056  static {
057    System.setProperty("org.apache.commons.logging.Log",
058      "org.apache.commons.logging.impl.SimpleLog");
059    System.setProperty(
060      "org.apache.commons.logging.simplelog.log.org.apache.hadoop.io.compress.CodecPool", "WARN");
061  }
062
063  private static final Logger LOG =
064    LoggerFactory.getLogger(HFilePerformanceEvaluation.class.getName());
065
066  static byte[] format(final int i) {
067    String v = Integer.toString(i);
068    return Bytes.toBytes("0000000000".substring(v.length()) + v);
069  }
070
071  static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
072    w.set(format(i));
073    return w;
074  }
075
076  static Cell createCell(final int i) {
077    return createCell(i, HConstants.EMPTY_BYTE_ARRAY);
078  }
079
080  /**
081   * HFile is Cell-based. It used to be byte arrays. Doing this test, pass Cells. All Cells
082   * intentionally have same coordinates in all fields but row.
083   * @param i     Integer to format as a row Key.
084   * @param value Value to use
085   * @return Created Cell.
086   */
087  static Cell createCell(final int i, final byte[] value) {
088    return createCell(format(i), value);
089  }
090
091  static Cell createCell(final byte[] keyRow) {
092    return CellUtil.createCell(keyRow);
093  }
094
095  static Cell createCell(final byte[] keyRow, final byte[] value) {
096    return CellUtil.createCell(keyRow, value);
097  }
098
099  /**
100   * Add any supported codec or cipher to test the HFile read/write performance. Specify "none" to
101   * disable codec or cipher or both. n
102   */
103  private void runBenchmarks() throws Exception {
104    final Configuration conf = new Configuration();
105    final FileSystem fs = FileSystem.get(conf);
106    final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
107
108    // codec=none cipher=none
109    runWriteBenchmark(conf, fs, mf, "none", "none");
110    runReadBenchmark(conf, fs, mf, "none", "none");
111
112    // codec=gz cipher=none
113    runWriteBenchmark(conf, fs, mf, "gz", "none");
114    runReadBenchmark(conf, fs, mf, "gz", "none");
115
116    // Add configuration for AES cipher
117    final Configuration aesconf = new Configuration();
118    aesconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
119    aesconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
120    aesconf.setInt("hfile.format.version", 3);
121    final FileSystem aesfs = FileSystem.get(aesconf);
122    final Path aesmf = aesfs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
123
124    // codec=none cipher=aes
125    runWriteBenchmark(aesconf, aesfs, aesmf, "none", "aes");
126    runReadBenchmark(aesconf, aesfs, aesmf, "none", "aes");
127
128    // codec=gz cipher=aes
129    runWriteBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
130    runReadBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
131
132    // Add configuration for Commons cipher
133    final Configuration cryptoconf = new Configuration();
134    cryptoconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
135    cryptoconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
136    cryptoconf.setInt("hfile.format.version", 3);
137    cryptoconf.set(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY, CryptoCipherProvider.class.getName());
138    final FileSystem cryptofs = FileSystem.get(cryptoconf);
139    final Path cryptof = cryptofs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
140
141    // codec=none cipher=aes
142    runWriteBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
143    runReadBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
144
145    // codec=gz cipher=aes
146    runWriteBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
147    runReadBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
148
149    // cleanup test files
150    if (fs.exists(mf)) {
151      fs.delete(mf, true);
152    }
153    if (aesfs.exists(aesmf)) {
154      aesfs.delete(aesmf, true);
155    }
156    if (cryptofs.exists(aesmf)) {
157      cryptofs.delete(cryptof, true);
158    }
159
160    // Print Result Summary
161    LOG.info("\n***************\n" + "Result Summary" + "\n***************\n");
162    LOG.info(testSummary.toString());
163
164  }
165
166  /**
167   * Write a test HFile with the given codec & cipher nnn * @param codec "none", "lzo", "gz",
168   * "snappy"
169   * @param cipher "none", "aes" n
170   */
171  private void runWriteBenchmark(Configuration conf, FileSystem fs, Path mf, String codec,
172    String cipher) throws Exception {
173    if (fs.exists(mf)) {
174      fs.delete(mf, true);
175    }
176
177    runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT, codec, cipher), ROW_COUNT,
178      codec, getCipherName(conf, cipher));
179
180  }
181
182  /**
183   * Run all the read benchmarks for the test HFile nnn * @param codec "none", "lzo", "gz", "snappy"
184   * @param cipher "none", "aes"
185   */
186  private void runReadBenchmark(final Configuration conf, final FileSystem fs, final Path mf,
187    final String codec, final String cipher) {
188    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
189      @Override
190      public void run() {
191        try {
192          runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
193            getCipherName(conf, cipher));
194        } catch (Exception e) {
195          testSummary.append("UniformRandomSmallScan failed " + e.getMessage());
196          e.printStackTrace();
197        }
198      }
199    });
200
201    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
202      @Override
203      public void run() {
204        try {
205          runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
206            getCipherName(conf, cipher));
207        } catch (Exception e) {
208          testSummary.append("UniformRandomReadBenchmark failed " + e.getMessage());
209          e.printStackTrace();
210        }
211      }
212    });
213
214    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
215      @Override
216      public void run() {
217        try {
218          runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
219            getCipherName(conf, cipher));
220        } catch (Exception e) {
221          testSummary.append("GaussianRandomReadBenchmark failed " + e.getMessage());
222          e.printStackTrace();
223        }
224      }
225    });
226
227    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
228      @Override
229      public void run() {
230        try {
231          runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT), ROW_COUNT, codec,
232            getCipherName(conf, cipher));
233        } catch (Exception e) {
234          testSummary.append("SequentialReadBenchmark failed " + e.getMessage());
235          e.printStackTrace();
236        }
237      }
238    });
239
240  }
241
242  protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount, String codec,
243    String cipher) throws Exception {
244    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + codec + "] "
245      + "cipher[" + cipher + "] for " + rowCount + " rows.");
246
247    long elapsedTime = benchmark.run();
248
249    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + codec + "] "
250      + "cipher[" + cipher + "] for " + rowCount + " rows took " + elapsedTime + "ms.");
251
252    // Store results to print summary at the end
253    testSummary.append("Running ").append(benchmark.getClass().getSimpleName())
254      .append(" with codec[").append(codec).append("] cipher[").append(cipher).append("] for ")
255      .append(rowCount).append(" rows took ").append(elapsedTime).append("ms.").append("\n");
256  }
257
258  static abstract class RowOrientedBenchmark {
259
260    protected final Configuration conf;
261    protected final FileSystem fs;
262    protected final Path mf;
263    protected final int totalRows;
264    protected String codec = "none";
265    protected String cipher = "none";
266
267    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows,
268      String codec, String cipher) {
269      this.conf = conf;
270      this.fs = fs;
271      this.mf = mf;
272      this.totalRows = totalRows;
273      this.codec = codec;
274      this.cipher = cipher;
275    }
276
277    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
278      this.conf = conf;
279      this.fs = fs;
280      this.mf = mf;
281      this.totalRows = totalRows;
282    }
283
284    void setUp() throws Exception {
285      // do nothing
286    }
287
288    abstract void doRow(int i) throws Exception;
289
290    protected int getReportingPeriod() {
291      return this.totalRows / 10;
292    }
293
294    void tearDown() throws Exception {
295      // do nothing
296    }
297
298    /**
299     * Run benchmark
300     * @return elapsed time. n
301     */
302    long run() throws Exception {
303      long elapsedTime;
304      setUp();
305      long startTime = EnvironmentEdgeManager.currentTime();
306      try {
307        for (int i = 0; i < totalRows; i++) {
308          if (i > 0 && i % getReportingPeriod() == 0) {
309            LOG.info("Processed " + i + " rows.");
310          }
311          doRow(i);
312        }
313        elapsedTime = EnvironmentEdgeManager.currentTime() - startTime;
314      } finally {
315        tearDown();
316      }
317      return elapsedTime;
318    }
319
320  }
321
322  static class SequentialWriteBenchmark extends RowOrientedBenchmark {
323    protected HFile.Writer writer;
324    private byte[] bytes = new byte[ROW_LENGTH];
325
326    public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows,
327      String codec, String cipher) {
328      super(conf, fs, mf, totalRows, codec, cipher);
329    }
330
331    @Override
332    void setUp() throws Exception {
333
334      HFileContextBuilder builder = new HFileContextBuilder()
335        .withCompression(HFileWriterImpl.compressionByName(codec)).withBlockSize(RFILE_BLOCKSIZE);
336
337      if (cipher == "aes") {
338        byte[] cipherKey = new byte[AES.KEY_LENGTH];
339        Bytes.secureRandom(cipherKey);
340        builder.withEncryptionContext(Encryption.newContext(conf)
341          .setCipher(Encryption.getCipher(conf, cipher)).setKey(cipherKey));
342      } else if (!"none".equals(cipher)) {
343        throw new IOException("Cipher " + cipher + " not supported.");
344      }
345
346      HFileContext hFileContext = builder.build();
347
348      writer =
349        HFile.getWriterFactoryNoCache(conf).withPath(fs, mf).withFileContext(hFileContext).create();
350    }
351
352    @Override
353    void doRow(int i) throws Exception {
354      writer.append(createCell(i, generateValue()));
355    }
356
357    private byte[] generateValue() {
358      Bytes.random(bytes);
359      return bytes;
360    }
361
362    @Override
363    protected int getReportingPeriod() {
364      return this.totalRows; // don't report progress
365    }
366
367    @Override
368    void tearDown() throws Exception {
369      writer.close();
370    }
371
372  }
373
374  static abstract class ReadBenchmark extends RowOrientedBenchmark {
375
376    protected HFile.Reader reader;
377
378    public ReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
379      super(conf, fs, mf, totalRows);
380    }
381
382    @Override
383    void setUp() throws Exception {
384      reader = HFile.createReader(this.fs, this.mf, new CacheConfig(this.conf), true, this.conf);
385    }
386
387    @Override
388    void tearDown() throws Exception {
389      reader.close();
390    }
391
392  }
393
394  static class SequentialReadBenchmark extends ReadBenchmark {
395    private HFileScanner scanner;
396
397    public SequentialReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
398      super(conf, fs, mf, totalRows);
399    }
400
401    @Override
402    void setUp() throws Exception {
403      super.setUp();
404      this.scanner = this.reader.getScanner(conf, false, false);
405      this.scanner.seekTo();
406    }
407
408    @Override
409    void doRow(int i) throws Exception {
410      if (this.scanner.next()) {
411        // TODO: Fix. Make Scanner do Cells.
412        Cell c = this.scanner.getCell();
413        PerformanceEvaluationCommons.assertKey(format(i + 1), c);
414        PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
415      }
416    }
417
418    @Override
419    protected int getReportingPeriod() {
420      return this.totalRows; // don't report progress
421    }
422
423  }
424
425  static class UniformRandomReadBenchmark extends ReadBenchmark {
426
427    public UniformRandomReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
428      super(conf, fs, mf, totalRows);
429    }
430
431    @Override
432    void doRow(int i) throws Exception {
433      HFileScanner scanner = this.reader.getScanner(conf, false, true);
434      byte[] b = getRandomRow();
435      if (scanner.seekTo(createCell(b)) < 0) {
436        LOG.info("Not able to seekTo " + new String(b));
437        return;
438      }
439      // TODO: Fix scanner so it does Cells
440      Cell c = scanner.getCell();
441      PerformanceEvaluationCommons.assertKey(b, c);
442      PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
443    }
444
445    private byte[] getRandomRow() {
446      return format(ThreadLocalRandom.current().nextInt(totalRows));
447    }
448  }
449
450  static class UniformRandomSmallScan extends ReadBenchmark {
451
452    public UniformRandomSmallScan(Configuration conf, FileSystem fs, Path mf, int totalRows) {
453      super(conf, fs, mf, totalRows / 10);
454    }
455
456    @Override
457    void doRow(int i) throws Exception {
458      HFileScanner scanner = this.reader.getScanner(conf, false, false);
459      byte[] b = getRandomRow();
460      // System.out.println("Random row: " + new String(b));
461      Cell c = createCell(b);
462      if (scanner.seekTo(c) != 0) {
463        LOG.info("Nonexistent row: " + new String(b));
464        return;
465      }
466      // TODO: HFileScanner doesn't do Cells yet. Temporary fix.
467      c = scanner.getCell();
468      // System.out.println("Found row: " +
469      // new String(c.getRowArray(), c.getRowOffset(), c.getRowLength()));
470      PerformanceEvaluationCommons.assertKey(b, c);
471      for (int ii = 0; ii < 30; ii++) {
472        if (!scanner.next()) {
473          LOG.info("NOTHING FOLLOWS");
474          return;
475        }
476        c = scanner.getCell();
477        PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, c.getValueLength());
478      }
479    }
480
481    private byte[] getRandomRow() {
482      return format(ThreadLocalRandom.current().nextInt(totalRows));
483    }
484  }
485
486  static class GaussianRandomReadBenchmark extends ReadBenchmark {
487
488    private RandomData randomData = new RandomDataImpl();
489
490    public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs, Path mf, int totalRows) {
491      super(conf, fs, mf, totalRows);
492    }
493
494    @Override
495    void doRow(int i) throws Exception {
496      HFileScanner scanner = this.reader.getScanner(conf, false, true);
497      byte[] gaussianRandomRowBytes = getGaussianRandomRowBytes();
498      scanner.seekTo(createCell(gaussianRandomRowBytes));
499      for (int ii = 0; ii < 30; ii++) {
500        if (!scanner.next()) {
501          LOG.info("NOTHING FOLLOWS");
502          return;
503        }
504        // TODO: Fix. Make scanner do Cells.
505        scanner.getCell();
506      }
507    }
508
509    private byte[] getGaussianRandomRowBytes() {
510      int r = (int) randomData.nextGaussian((double) totalRows / 2.0, (double) totalRows / 10.0);
511      // make sure r falls into [0,totalRows)
512      return format(Math.min(totalRows, Math.max(r, 0)));
513    }
514  }
515
516  /**
517   * nnn
518   */
519  public static void main(String[] args) throws Exception {
520    new HFilePerformanceEvaluation().runBenchmarks();
521  }
522
523  private String getCipherName(Configuration conf, String cipherName) {
524    if (cipherName.equals("aes")) {
525      String provider = conf.get(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY);
526      if (
527        provider == null || provider.equals("")
528          || provider.equals(DefaultCipherProvider.class.getName())
529      ) {
530        return "aes-default";
531      } else if (provider.equals(CryptoCipherProvider.class.getName())) {
532        return "aes-commons";
533      }
534    }
535    return cipherName;
536  }
537}