001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.security.SecureRandom;
023import java.util.Random;
024
025import org.apache.commons.math3.random.RandomData;
026import org.apache.commons.math3.random.RandomDataImpl;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
034import org.apache.hadoop.hbase.io.crypto.CryptoCipherProvider;
035import org.apache.hadoop.hbase.io.crypto.DefaultCipherProvider;
036import org.apache.hadoop.hbase.io.crypto.Encryption;
037import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
038import org.apache.hadoop.hbase.io.crypto.aes.AES;
039import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
040import org.apache.hadoop.hbase.io.hfile.CacheConfig;
041import org.apache.hadoop.hbase.io.hfile.HFile;
042import org.apache.hadoop.hbase.io.hfile.HFileContext;
043import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
044import org.apache.hadoop.hbase.io.hfile.HFileScanner;
045import org.apache.hadoop.hbase.util.Bytes;
046
047/**
048 * This class runs performance benchmarks for {@link HFile}.
049 */
050@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
051public class HFilePerformanceEvaluation {
052  private static final int ROW_LENGTH = 10;
053  private static final int ROW_COUNT = 1000000;
054  private static final int RFILE_BLOCKSIZE = 8 * 1024;
055  private static StringBuilder testSummary = new StringBuilder();
056  
057  // Disable verbose INFO logging from org.apache.hadoop.io.compress.CodecPool
058  static {
059    System.setProperty("org.apache.commons.logging.Log", 
060      "org.apache.commons.logging.impl.SimpleLog");
061    System.setProperty("org.apache.commons.logging.simplelog.log.org.apache.hadoop.io.compress.CodecPool",
062      "WARN");
063  }
064  
065  private static final Logger LOG =
066    LoggerFactory.getLogger(HFilePerformanceEvaluation.class.getName());
067
068  static byte [] format(final int i) {
069    String v = Integer.toString(i);
070    return Bytes.toBytes("0000000000".substring(v.length()) + v);
071  }
072
073  static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
074    w.set(format(i));
075    return w;
076  }
077
078  static Cell createCell(final int i) {
079    return createCell(i, HConstants.EMPTY_BYTE_ARRAY);
080  }
081
082  /**
083   * HFile is Cell-based. It used to be byte arrays.  Doing this test, pass Cells. All Cells
084   * intentionally have same coordinates in all fields but row.
085   * @param i Integer to format as a row Key.
086   * @param value Value to use
087   * @return Created Cell.
088   */
089  static Cell createCell(final int i, final byte [] value) {
090    return createCell(format(i), value);
091  }
092
093  static Cell createCell(final byte [] keyRow) {
094    return CellUtil.createCell(keyRow);
095  }
096
097  static Cell createCell(final byte [] keyRow, final byte [] value) {
098    return CellUtil.createCell(keyRow, value);
099  }
100
101  /**
102   * Add any supported codec or cipher to test the HFile read/write performance. 
103   * Specify "none" to disable codec or cipher or both.  
104   * @throws Exception
105   */
106  private void runBenchmarks() throws Exception {
107    final Configuration conf = new Configuration();
108    final FileSystem fs = FileSystem.get(conf);
109    final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
110    
111    // codec=none cipher=none
112    runWriteBenchmark(conf, fs, mf, "none", "none");
113    runReadBenchmark(conf, fs, mf, "none", "none");
114    
115    // codec=gz cipher=none
116    runWriteBenchmark(conf, fs, mf, "gz", "none");
117    runReadBenchmark(conf, fs, mf, "gz", "none");
118
119    // Add configuration for AES cipher
120    final Configuration aesconf = new Configuration();
121    aesconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
122    aesconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
123    aesconf.setInt("hfile.format.version", 3);
124    final FileSystem aesfs = FileSystem.get(aesconf);
125    final Path aesmf = aesfs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
126
127    // codec=none cipher=aes
128    runWriteBenchmark(aesconf, aesfs, aesmf, "none", "aes");
129    runReadBenchmark(aesconf, aesfs, aesmf, "none", "aes");
130
131    // codec=gz cipher=aes
132    runWriteBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
133    runReadBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
134
135    // Add configuration for Commons cipher
136    final Configuration cryptoconf = new Configuration();
137    cryptoconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
138    cryptoconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
139    cryptoconf.setInt("hfile.format.version", 3);
140    cryptoconf.set(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY, CryptoCipherProvider.class.getName());
141    final FileSystem cryptofs = FileSystem.get(cryptoconf);
142    final Path cryptof = cryptofs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
143
144    // codec=none cipher=aes
145    runWriteBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
146    runReadBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
147
148    // codec=gz cipher=aes
149    runWriteBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
150    runReadBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
151
152    // cleanup test files
153    if (fs.exists(mf)) {
154      fs.delete(mf, true);
155    }
156    if (aesfs.exists(aesmf)) {
157      aesfs.delete(aesmf, true);
158    }
159    if (cryptofs.exists(aesmf)) {
160      cryptofs.delete(cryptof, true);
161    }
162
163    // Print Result Summary
164    LOG.info("\n***************\n" + "Result Summary" + "\n***************\n");
165    LOG.info(testSummary.toString());
166
167  }
168
169  /**
170   * Write a test HFile with the given codec & cipher
171   * @param conf
172   * @param fs
173   * @param mf
174   * @param codec "none", "lzo", "gz", "snappy"
175   * @param cipher "none", "aes"
176   * @throws Exception
177   */
178  private void runWriteBenchmark(Configuration conf, FileSystem fs, Path mf, String codec,
179      String cipher) throws Exception {
180    if (fs.exists(mf)) {
181      fs.delete(mf, true);
182    }
183
184    runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT, codec, cipher),
185        ROW_COUNT, codec, getCipherName(conf, cipher));
186
187  }
188
189  /**
190   * Run all the read benchmarks for the test HFile 
191   * @param conf
192   * @param fs
193   * @param mf
194   * @param codec "none", "lzo", "gz", "snappy"
195   * @param cipher "none", "aes"
196   */
197  private void runReadBenchmark(final Configuration conf, final FileSystem fs, final Path mf,
198      final String codec, final String cipher) {
199    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
200      @Override
201      public void run() {
202        try {
203          runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT),
204            ROW_COUNT, codec, getCipherName(conf, cipher));
205        } catch (Exception e) {
206          testSummary.append("UniformRandomSmallScan failed " + e.getMessage());
207          e.printStackTrace();
208        }
209      }
210    });
211    
212    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
213      @Override
214      public void run() {
215        try {
216          runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
217              ROW_COUNT, codec, getCipherName(conf, cipher));
218        } catch (Exception e) {
219          testSummary.append("UniformRandomReadBenchmark failed " + e.getMessage());
220          e.printStackTrace();
221        }
222      }
223    });
224    
225    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
226      @Override
227      public void run() {
228        try {
229          runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
230              ROW_COUNT, codec, getCipherName(conf, cipher));
231        } catch (Exception e) {
232          testSummary.append("GaussianRandomReadBenchmark failed " + e.getMessage());
233          e.printStackTrace();
234        }
235      }
236    });
237    
238    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
239      @Override
240      public void run() {
241        try {
242          runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT),
243              ROW_COUNT, codec, getCipherName(conf, cipher));
244        } catch (Exception e) {
245          testSummary.append("SequentialReadBenchmark failed " + e.getMessage());
246          e.printStackTrace();
247        }
248      }
249    });    
250
251  }
252  
253  protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount,
254      String codec, String cipher) throws Exception {
255    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + 
256        codec + "] " + "cipher[" + cipher + "] for " + rowCount + " rows.");
257    
258    long elapsedTime = benchmark.run();
259    
260    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + 
261        codec + "] " + "cipher[" + cipher + "] for " + rowCount + " rows took " + 
262        elapsedTime + "ms.");
263    
264    // Store results to print summary at the end
265    testSummary.append("Running ").append(benchmark.getClass().getSimpleName())
266        .append(" with codec[").append(codec).append("] cipher[").append(cipher)
267        .append("] for ").append(rowCount).append(" rows took ").append(elapsedTime)
268        .append("ms.").append("\n");
269  }
270
271  static abstract class RowOrientedBenchmark {
272
273    protected final Configuration conf;
274    protected final FileSystem fs;
275    protected final Path mf;
276    protected final int totalRows;
277    protected String codec = "none";
278    protected String cipher = "none";
279
280    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
281        int totalRows, String codec, String cipher) {
282      this.conf = conf;
283      this.fs = fs;
284      this.mf = mf;
285      this.totalRows = totalRows;
286      this.codec = codec;
287      this.cipher = cipher;
288    }
289
290    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
291        int totalRows) {
292      this.conf = conf;
293      this.fs = fs;
294      this.mf = mf;
295      this.totalRows = totalRows;
296    }
297
298    void setUp() throws Exception {
299      // do nothing
300    }
301
302    abstract void doRow(int i) throws Exception;
303
304    protected int getReportingPeriod() {
305      return this.totalRows / 10;
306    }
307
308    void tearDown() throws Exception {
309      // do nothing
310    }
311
312    /**
313     * Run benchmark
314     * @return elapsed time.
315     * @throws Exception
316     */
317    long run() throws Exception {
318      long elapsedTime;
319      setUp();
320      long startTime = System.currentTimeMillis();
321      try {
322        for (int i = 0; i < totalRows; i++) {
323          if (i > 0 && i % getReportingPeriod() == 0) {
324            LOG.info("Processed " + i + " rows.");
325          }
326          doRow(i);
327        }
328        elapsedTime = System.currentTimeMillis() - startTime;
329      } finally {
330        tearDown();
331      }
332      return elapsedTime;
333    }
334
335  }
336
337  static class SequentialWriteBenchmark extends RowOrientedBenchmark {
338    protected HFile.Writer writer;
339    private Random random = new Random();
340    private byte[] bytes = new byte[ROW_LENGTH];
341
342    public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf,
343        int totalRows, String codec, String cipher) {
344      super(conf, fs, mf, totalRows, codec, cipher);
345    }
346
347    @Override
348    void setUp() throws Exception {
349
350      HFileContextBuilder builder = new HFileContextBuilder()
351          .withCompression(HFileWriterImpl.compressionByName(codec))
352          .withBlockSize(RFILE_BLOCKSIZE);
353      
354      if (cipher == "aes") {
355        byte[] cipherKey = new byte[AES.KEY_LENGTH];
356        new SecureRandom().nextBytes(cipherKey);
357        builder.withEncryptionContext(Encryption.newContext(conf)
358            .setCipher(Encryption.getCipher(conf, cipher))
359            .setKey(cipherKey));
360      } else if (!"none".equals(cipher)) {
361        throw new IOException("Cipher " + cipher + " not supported.");
362      }
363      
364      HFileContext hFileContext = builder.build();
365
366      writer = HFile.getWriterFactoryNoCache(conf)
367          .withPath(fs, mf)
368          .withFileContext(hFileContext)
369          .withComparator(CellComparator.getInstance())
370          .create();
371    }
372    
373    @Override
374    void doRow(int i) throws Exception {
375      writer.append(createCell(i, generateValue()));
376    }
377
378    private byte[] generateValue() {
379      random.nextBytes(bytes);
380      return bytes;
381    }
382
383    @Override
384    protected int getReportingPeriod() {
385      return this.totalRows; // don't report progress
386    }
387
388    @Override
389    void tearDown() throws Exception {
390      writer.close();
391    }
392
393  }
394
395  static abstract class ReadBenchmark extends RowOrientedBenchmark {
396
397    protected HFile.Reader reader;
398
399    public ReadBenchmark(Configuration conf, FileSystem fs, Path mf,
400        int totalRows) {
401      super(conf, fs, mf, totalRows);
402    }
403
404    @Override
405    void setUp() throws Exception {
406      reader = HFile.createReader(this.fs, this.mf, new CacheConfig(this.conf), true, this.conf);
407      this.reader.loadFileInfo();
408    }
409
410    @Override
411    void tearDown() throws Exception {
412      reader.close();
413    }
414
415  }
416
417  static class SequentialReadBenchmark extends ReadBenchmark {
418    private HFileScanner scanner;
419
420    public SequentialReadBenchmark(Configuration conf, FileSystem fs,
421      Path mf, int totalRows) {
422      super(conf, fs, mf, totalRows);
423    }
424
425    @Override
426    void setUp() throws Exception {
427      super.setUp();
428      this.scanner = this.reader.getScanner(false, false);
429      this.scanner.seekTo();
430    }
431
432    @Override
433    void doRow(int i) throws Exception {
434      if (this.scanner.next()) {
435        // TODO: Fix. Make Scanner do Cells.
436        Cell c = this.scanner.getCell();
437        PerformanceEvaluationCommons.assertKey(format(i + 1), c);
438        PerformanceEvaluationCommons.assertValueSize(c.getValueLength(), ROW_LENGTH);
439      }
440    }
441
442    @Override
443    protected int getReportingPeriod() {
444      return this.totalRows; // don't report progress
445    }
446
447  }
448
449  static class UniformRandomReadBenchmark extends ReadBenchmark {
450
451    private Random random = new Random();
452
453    public UniformRandomReadBenchmark(Configuration conf, FileSystem fs,
454        Path mf, int totalRows) {
455      super(conf, fs, mf, totalRows);
456    }
457
458    @Override
459    void doRow(int i) throws Exception {
460      HFileScanner scanner = this.reader.getScanner(false, true);
461      byte [] b = getRandomRow();
462      if (scanner.seekTo(createCell(b)) < 0) {
463        LOG.info("Not able to seekTo " + new String(b));
464        return;
465      }
466      // TODO: Fix scanner so it does Cells
467      Cell c = scanner.getCell();
468      PerformanceEvaluationCommons.assertKey(b, c);
469      PerformanceEvaluationCommons.assertValueSize(c.getValueLength(), ROW_LENGTH);
470    }
471
472    private byte [] getRandomRow() {
473      return format(random.nextInt(totalRows));
474    }
475  }
476
477  static class UniformRandomSmallScan extends ReadBenchmark {
478    private Random random = new Random();
479
480    public UniformRandomSmallScan(Configuration conf, FileSystem fs,
481        Path mf, int totalRows) {
482      super(conf, fs, mf, totalRows/10);
483    }
484
485    @Override
486    void doRow(int i) throws Exception {
487      HFileScanner scanner = this.reader.getScanner(false, false);
488      byte [] b = getRandomRow();
489      // System.out.println("Random row: " + new String(b));
490      Cell c = createCell(b);
491      if (scanner.seekTo(c) != 0) {
492        LOG.info("Nonexistent row: " + new String(b));
493        return;
494      }
495      // TODO: HFileScanner doesn't do Cells yet. Temporary fix.
496      c = scanner.getCell();
497      // System.out.println("Found row: " +
498      //  new String(c.getRowArray(), c.getRowOffset(), c.getRowLength()));
499      PerformanceEvaluationCommons.assertKey(b, c);
500      for (int ii = 0; ii < 30; ii++) {
501        if (!scanner.next()) {
502          LOG.info("NOTHING FOLLOWS");
503          return;
504        }
505        c = scanner.getCell();
506        PerformanceEvaluationCommons.assertValueSize(c.getValueLength(), ROW_LENGTH);
507      }
508    }
509
510    private byte [] getRandomRow() {
511      return format(random.nextInt(totalRows));
512    }
513  }
514
515  static class GaussianRandomReadBenchmark extends ReadBenchmark {
516
517    private RandomData randomData = new RandomDataImpl();
518
519    public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs,
520        Path mf, int totalRows) {
521      super(conf, fs, mf, totalRows);
522    }
523
524    @Override
525    void doRow(int i) throws Exception {
526      HFileScanner scanner = this.reader.getScanner(false, true);
527      byte[] gaussianRandomRowBytes = getGaussianRandomRowBytes();
528      scanner.seekTo(createCell(gaussianRandomRowBytes));
529      for (int ii = 0; ii < 30; ii++) {
530        if (!scanner.next()) {
531          LOG.info("NOTHING FOLLOWS");
532          return;
533        }
534        // TODO: Fix. Make scanner do Cells.
535        scanner.getCell();
536      }
537    }
538
539    private byte [] getGaussianRandomRowBytes() {
540      int r = (int) randomData.nextGaussian((double)totalRows / 2.0,
541          (double)totalRows / 10.0);
542      // make sure r falls into [0,totalRows)
543      return format(Math.min(totalRows, Math.max(r,0)));
544    }
545  }
546
547  /**
548   * @param args
549   * @throws Exception
550   * @throws IOException
551   */
552  public static void main(String[] args) throws Exception {
553    new HFilePerformanceEvaluation().runBenchmarks();
554  }
555
556  private String getCipherName(Configuration conf, String cipherName) {
557    if (cipherName.equals("aes")) {
558      String provider = conf.get(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY);
559      if (provider == null || provider.equals("")
560              || provider.equals(DefaultCipherProvider.class.getName())) {
561        return "aes-default";
562      } else if (provider.equals(CryptoCipherProvider.class.getName())) {
563        return "aes-commons";
564      }
565    }
566    return cipherName;
567  }
568}