001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.security.SecureRandom;
023import java.util.Random;
024
025import org.apache.commons.math3.random.RandomData;
026import org.apache.commons.math3.random.RandomDataImpl;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
034import org.apache.hadoop.hbase.io.crypto.CryptoCipherProvider;
035import org.apache.hadoop.hbase.io.crypto.DefaultCipherProvider;
036import org.apache.hadoop.hbase.io.crypto.Encryption;
037import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
038import org.apache.hadoop.hbase.io.crypto.aes.AES;
039import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
040import org.apache.hadoop.hbase.io.hfile.CacheConfig;
041import org.apache.hadoop.hbase.io.hfile.HFile;
042import org.apache.hadoop.hbase.io.hfile.HFileContext;
043import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
044import org.apache.hadoop.hbase.io.hfile.HFileScanner;
045import org.apache.hadoop.hbase.util.Bytes;
046
047/**
048 * This class runs performance benchmarks for {@link HFile}.
049 */
050@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
051public class HFilePerformanceEvaluation {
052  private static final int ROW_LENGTH = 10;
053  private static final int ROW_COUNT = 1000000;
054  private static final int RFILE_BLOCKSIZE = 8 * 1024;
055  private static StringBuilder testSummary = new StringBuilder();
056  
057  // Disable verbose INFO logging from org.apache.hadoop.io.compress.CodecPool
058  static {
059    System.setProperty("org.apache.commons.logging.Log", 
060      "org.apache.commons.logging.impl.SimpleLog");
061    System.setProperty("org.apache.commons.logging.simplelog.log.org.apache.hadoop.io.compress.CodecPool",
062      "WARN");
063  }
064  
065  private static final Logger LOG =
066    LoggerFactory.getLogger(HFilePerformanceEvaluation.class.getName());
067
068  static byte [] format(final int i) {
069    String v = Integer.toString(i);
070    return Bytes.toBytes("0000000000".substring(v.length()) + v);
071  }
072
073  static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
074    w.set(format(i));
075    return w;
076  }
077
078  static Cell createCell(final int i) {
079    return createCell(i, HConstants.EMPTY_BYTE_ARRAY);
080  }
081
082  /**
083   * HFile is Cell-based. It used to be byte arrays.  Doing this test, pass Cells. All Cells
084   * intentionally have same coordinates in all fields but row.
085   * @param i Integer to format as a row Key.
086   * @param value Value to use
087   * @return Created Cell.
088   */
089  static Cell createCell(final int i, final byte [] value) {
090    return createCell(format(i), value);
091  }
092
093  static Cell createCell(final byte [] keyRow) {
094    return CellUtil.createCell(keyRow);
095  }
096
097  static Cell createCell(final byte [] keyRow, final byte [] value) {
098    return CellUtil.createCell(keyRow, value);
099  }
100
101  /**
102   * Add any supported codec or cipher to test the HFile read/write performance. 
103   * Specify "none" to disable codec or cipher or both.  
104   * @throws Exception
105   */
106  private void runBenchmarks() throws Exception {
107    final Configuration conf = new Configuration();
108    final FileSystem fs = FileSystem.get(conf);
109    final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
110    
111    // codec=none cipher=none
112    runWriteBenchmark(conf, fs, mf, "none", "none");
113    runReadBenchmark(conf, fs, mf, "none", "none");
114    
115    // codec=gz cipher=none
116    runWriteBenchmark(conf, fs, mf, "gz", "none");
117    runReadBenchmark(conf, fs, mf, "gz", "none");
118
119    // Add configuration for AES cipher
120    final Configuration aesconf = new Configuration();
121    aesconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
122    aesconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
123    aesconf.setInt("hfile.format.version", 3);
124    final FileSystem aesfs = FileSystem.get(aesconf);
125    final Path aesmf = aesfs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
126
127    // codec=none cipher=aes
128    runWriteBenchmark(aesconf, aesfs, aesmf, "none", "aes");
129    runReadBenchmark(aesconf, aesfs, aesmf, "none", "aes");
130
131    // codec=gz cipher=aes
132    runWriteBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
133    runReadBenchmark(aesconf, aesfs, aesmf, "gz", "aes");
134
135    // Add configuration for Commons cipher
136    final Configuration cryptoconf = new Configuration();
137    cryptoconf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
138    cryptoconf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
139    cryptoconf.setInt("hfile.format.version", 3);
140    cryptoconf.set(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY, CryptoCipherProvider.class.getName());
141    final FileSystem cryptofs = FileSystem.get(cryptoconf);
142    final Path cryptof = cryptofs.makeQualified(new Path("performanceevaluation.aes.mapfile"));
143
144    // codec=none cipher=aes
145    runWriteBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
146    runReadBenchmark(cryptoconf, cryptofs, aesmf, "none", "aes");
147
148    // codec=gz cipher=aes
149    runWriteBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
150    runReadBenchmark(cryptoconf, aesfs, aesmf, "gz", "aes");
151
152    // cleanup test files
153    if (fs.exists(mf)) {
154      fs.delete(mf, true);
155    }
156    if (aesfs.exists(aesmf)) {
157      aesfs.delete(aesmf, true);
158    }
159    if (cryptofs.exists(aesmf)) {
160      cryptofs.delete(cryptof, true);
161    }
162
163    // Print Result Summary
164    LOG.info("\n***************\n" + "Result Summary" + "\n***************\n");
165    LOG.info(testSummary.toString());
166
167  }
168
169  /**
170   * Write a test HFile with the given codec & cipher
171   * @param conf
172   * @param fs
173   * @param mf
174   * @param codec "none", "lzo", "gz", "snappy"
175   * @param cipher "none", "aes"
176   * @throws Exception
177   */
178  private void runWriteBenchmark(Configuration conf, FileSystem fs, Path mf, String codec,
179      String cipher) throws Exception {
180    if (fs.exists(mf)) {
181      fs.delete(mf, true);
182    }
183
184    runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT, codec, cipher),
185        ROW_COUNT, codec, getCipherName(conf, cipher));
186
187  }
188
189  /**
190   * Run all the read benchmarks for the test HFile 
191   * @param conf
192   * @param fs
193   * @param mf
194   * @param codec "none", "lzo", "gz", "snappy"
195   * @param cipher "none", "aes"
196   */
197  private void runReadBenchmark(final Configuration conf, final FileSystem fs, final Path mf,
198      final String codec, final String cipher) {
199    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
200      @Override
201      public void run() {
202        try {
203          runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT),
204            ROW_COUNT, codec, getCipherName(conf, cipher));
205        } catch (Exception e) {
206          testSummary.append("UniformRandomSmallScan failed " + e.getMessage());
207          e.printStackTrace();
208        }
209      }
210    });
211    
212    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
213      @Override
214      public void run() {
215        try {
216          runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
217              ROW_COUNT, codec, getCipherName(conf, cipher));
218        } catch (Exception e) {
219          testSummary.append("UniformRandomReadBenchmark failed " + e.getMessage());
220          e.printStackTrace();
221        }
222      }
223    });
224    
225    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
226      @Override
227      public void run() {
228        try {
229          runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
230              ROW_COUNT, codec, getCipherName(conf, cipher));
231        } catch (Exception e) {
232          testSummary.append("GaussianRandomReadBenchmark failed " + e.getMessage());
233          e.printStackTrace();
234        }
235      }
236    });
237    
238    PerformanceEvaluationCommons.concurrentReads(new Runnable() {
239      @Override
240      public void run() {
241        try {
242          runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT),
243              ROW_COUNT, codec, getCipherName(conf, cipher));
244        } catch (Exception e) {
245          testSummary.append("SequentialReadBenchmark failed " + e.getMessage());
246          e.printStackTrace();
247        }
248      }
249    });    
250
251  }
252  
253  protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount,
254      String codec, String cipher) throws Exception {
255    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + 
256        codec + "] " + "cipher[" + cipher + "] for " + rowCount + " rows.");
257    
258    long elapsedTime = benchmark.run();
259    
260    LOG.info("Running " + benchmark.getClass().getSimpleName() + " with codec[" + 
261        codec + "] " + "cipher[" + cipher + "] for " + rowCount + " rows took " + 
262        elapsedTime + "ms.");
263    
264    // Store results to print summary at the end
265    testSummary.append("Running ").append(benchmark.getClass().getSimpleName())
266        .append(" with codec[").append(codec).append("] cipher[").append(cipher)
267        .append("] for ").append(rowCount).append(" rows took ").append(elapsedTime)
268        .append("ms.").append("\n");
269  }
270
271  static abstract class RowOrientedBenchmark {
272
273    protected final Configuration conf;
274    protected final FileSystem fs;
275    protected final Path mf;
276    protected final int totalRows;
277    protected String codec = "none";
278    protected String cipher = "none";
279
280    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
281        int totalRows, String codec, String cipher) {
282      this.conf = conf;
283      this.fs = fs;
284      this.mf = mf;
285      this.totalRows = totalRows;
286      this.codec = codec;
287      this.cipher = cipher;
288    }
289
290    public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
291        int totalRows) {
292      this.conf = conf;
293      this.fs = fs;
294      this.mf = mf;
295      this.totalRows = totalRows;
296    }
297
298    void setUp() throws Exception {
299      // do nothing
300    }
301
302    abstract void doRow(int i) throws Exception;
303
304    protected int getReportingPeriod() {
305      return this.totalRows / 10;
306    }
307
308    void tearDown() throws Exception {
309      // do nothing
310    }
311
312    /**
313     * Run benchmark
314     * @return elapsed time.
315     * @throws Exception
316     */
317    long run() throws Exception {
318      long elapsedTime;
319      setUp();
320      long startTime = System.currentTimeMillis();
321      try {
322        for (int i = 0; i < totalRows; i++) {
323          if (i > 0 && i % getReportingPeriod() == 0) {
324            LOG.info("Processed " + i + " rows.");
325          }
326          doRow(i);
327        }
328        elapsedTime = System.currentTimeMillis() - startTime;
329      } finally {
330        tearDown();
331      }
332      return elapsedTime;
333    }
334
335  }
336
337  static class SequentialWriteBenchmark extends RowOrientedBenchmark {
338    protected HFile.Writer writer;
339    private Random random = new Random();
340    private byte[] bytes = new byte[ROW_LENGTH];
341
342    public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf,
343        int totalRows, String codec, String cipher) {
344      super(conf, fs, mf, totalRows, codec, cipher);
345    }
346
347    @Override
348    void setUp() throws Exception {
349
350      HFileContextBuilder builder = new HFileContextBuilder()
351          .withCompression(HFileWriterImpl.compressionByName(codec))
352          .withBlockSize(RFILE_BLOCKSIZE);
353      
354      if (cipher == "aes") {
355        byte[] cipherKey = new byte[AES.KEY_LENGTH];
356        new SecureRandom().nextBytes(cipherKey);
357        builder.withEncryptionContext(Encryption.newContext(conf)
358            .setCipher(Encryption.getCipher(conf, cipher))
359            .setKey(cipherKey));
360      } else if (!"none".equals(cipher)) {
361        throw new IOException("Cipher " + cipher + " not supported.");
362      }
363      
364      HFileContext hFileContext = builder.build();
365
366      writer = HFile.getWriterFactoryNoCache(conf)
367          .withPath(fs, mf)
368          .withFileContext(hFileContext)
369          .create();
370    }
371    
372    @Override
373    void doRow(int i) throws Exception {
374      writer.append(createCell(i, generateValue()));
375    }
376
377    private byte[] generateValue() {
378      random.nextBytes(bytes);
379      return bytes;
380    }
381
382    @Override
383    protected int getReportingPeriod() {
384      return this.totalRows; // don't report progress
385    }
386
387    @Override
388    void tearDown() throws Exception {
389      writer.close();
390    }
391
392  }
393
394  static abstract class ReadBenchmark extends RowOrientedBenchmark {
395
396    protected HFile.Reader reader;
397
398    public ReadBenchmark(Configuration conf, FileSystem fs, Path mf,
399        int totalRows) {
400      super(conf, fs, mf, totalRows);
401    }
402
403    @Override
404    void setUp() throws Exception {
405      reader = HFile.createReader(this.fs, this.mf, new CacheConfig(this.conf), true, this.conf);
406    }
407
408    @Override
409    void tearDown() throws Exception {
410      reader.close();
411    }
412
413  }
414
415  static class SequentialReadBenchmark extends ReadBenchmark {
416    private HFileScanner scanner;
417
418    public SequentialReadBenchmark(Configuration conf, FileSystem fs,
419      Path mf, int totalRows) {
420      super(conf, fs, mf, totalRows);
421    }
422
423    @Override
424    void setUp() throws Exception {
425      super.setUp();
426      this.scanner = this.reader.getScanner(false, false);
427      this.scanner.seekTo();
428    }
429
430    @Override
431    void doRow(int i) throws Exception {
432      if (this.scanner.next()) {
433        // TODO: Fix. Make Scanner do Cells.
434        Cell c = this.scanner.getCell();
435        PerformanceEvaluationCommons.assertKey(format(i + 1), c);
436        PerformanceEvaluationCommons.assertValueSize(c.getValueLength(), ROW_LENGTH);
437      }
438    }
439
440    @Override
441    protected int getReportingPeriod() {
442      return this.totalRows; // don't report progress
443    }
444
445  }
446
447  static class UniformRandomReadBenchmark extends ReadBenchmark {
448
449    private Random random = new Random();
450
451    public UniformRandomReadBenchmark(Configuration conf, FileSystem fs,
452        Path mf, int totalRows) {
453      super(conf, fs, mf, totalRows);
454    }
455
456    @Override
457    void doRow(int i) throws Exception {
458      HFileScanner scanner = this.reader.getScanner(false, true);
459      byte [] b = getRandomRow();
460      if (scanner.seekTo(createCell(b)) < 0) {
461        LOG.info("Not able to seekTo " + new String(b));
462        return;
463      }
464      // TODO: Fix scanner so it does Cells
465      Cell c = scanner.getCell();
466      PerformanceEvaluationCommons.assertKey(b, c);
467      PerformanceEvaluationCommons.assertValueSize(c.getValueLength(), ROW_LENGTH);
468    }
469
470    private byte [] getRandomRow() {
471      return format(random.nextInt(totalRows));
472    }
473  }
474
475  static class UniformRandomSmallScan extends ReadBenchmark {
476    private Random random = new Random();
477
478    public UniformRandomSmallScan(Configuration conf, FileSystem fs,
479        Path mf, int totalRows) {
480      super(conf, fs, mf, totalRows/10);
481    }
482
483    @Override
484    void doRow(int i) throws Exception {
485      HFileScanner scanner = this.reader.getScanner(false, false);
486      byte [] b = getRandomRow();
487      // System.out.println("Random row: " + new String(b));
488      Cell c = createCell(b);
489      if (scanner.seekTo(c) != 0) {
490        LOG.info("Nonexistent row: " + new String(b));
491        return;
492      }
493      // TODO: HFileScanner doesn't do Cells yet. Temporary fix.
494      c = scanner.getCell();
495      // System.out.println("Found row: " +
496      //  new String(c.getRowArray(), c.getRowOffset(), c.getRowLength()));
497      PerformanceEvaluationCommons.assertKey(b, c);
498      for (int ii = 0; ii < 30; ii++) {
499        if (!scanner.next()) {
500          LOG.info("NOTHING FOLLOWS");
501          return;
502        }
503        c = scanner.getCell();
504        PerformanceEvaluationCommons.assertValueSize(c.getValueLength(), ROW_LENGTH);
505      }
506    }
507
508    private byte [] getRandomRow() {
509      return format(random.nextInt(totalRows));
510    }
511  }
512
513  static class GaussianRandomReadBenchmark extends ReadBenchmark {
514
515    private RandomData randomData = new RandomDataImpl();
516
517    public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs,
518        Path mf, int totalRows) {
519      super(conf, fs, mf, totalRows);
520    }
521
522    @Override
523    void doRow(int i) throws Exception {
524      HFileScanner scanner = this.reader.getScanner(false, true);
525      byte[] gaussianRandomRowBytes = getGaussianRandomRowBytes();
526      scanner.seekTo(createCell(gaussianRandomRowBytes));
527      for (int ii = 0; ii < 30; ii++) {
528        if (!scanner.next()) {
529          LOG.info("NOTHING FOLLOWS");
530          return;
531        }
532        // TODO: Fix. Make scanner do Cells.
533        scanner.getCell();
534      }
535    }
536
537    private byte [] getGaussianRandomRowBytes() {
538      int r = (int) randomData.nextGaussian((double)totalRows / 2.0,
539          (double)totalRows / 10.0);
540      // make sure r falls into [0,totalRows)
541      return format(Math.min(totalRows, Math.max(r,0)));
542    }
543  }
544
545  /**
546   * @param args
547   * @throws Exception
548   * @throws IOException
549   */
550  public static void main(String[] args) throws Exception {
551    new HFilePerformanceEvaluation().runBenchmarks();
552  }
553
554  private String getCipherName(Configuration conf, String cipherName) {
555    if (cipherName.equals("aes")) {
556      String provider = conf.get(HConstants.CRYPTO_CIPHERPROVIDER_CONF_KEY);
557      if (provider == null || provider.equals("")
558              || provider.equals(DefaultCipherProvider.class.getName())) {
559        return "aes-default";
560      } else if (provider.equals(CryptoCipherProvider.class.getName())) {
561        return "aes-commons";
562      }
563    }
564    return cipherName;
565  }
566}