001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.concurrent.TimeUnit;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.FSDataInputStream;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.client.ConnectionFactory;
028import org.apache.hadoop.hbase.client.Result;
029import org.apache.hadoop.hbase.client.ResultScanner;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.client.TableSnapshotScanner;
033import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
034import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
035import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
036import org.apache.hadoop.hbase.mapreduce.TableMapper;
037import org.apache.hadoop.hbase.util.AbstractHBaseTool;
038import org.apache.hadoop.hbase.util.CommonFSUtils;
039import org.apache.hadoop.io.NullWritable;
040import org.apache.hadoop.mapreduce.Counters;
041import org.apache.hadoop.mapreduce.Job;
042import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
043import org.apache.hadoop.util.StringUtils;
044import org.apache.hadoop.util.ToolRunner;
045import org.apache.yetus.audience.InterfaceAudience;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch;
048import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
049
050/**
051 * A simple performance evaluation tool for single client and MR scans and snapshot scans.
052 */
053@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
054public class ScanPerformanceEvaluation extends AbstractHBaseTool {
055
056  private static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters";
057
058  private String type;
059  private String file;
060  private String tablename;
061  private String snapshotName;
062  private String restoreDir;
063  private String caching;
064
065  @Override
066  public void setConf(Configuration conf) {
067    super.setConf(conf);
068    Path rootDir;
069    try {
070      rootDir = CommonFSUtils.getRootDir(conf);
071      rootDir.getFileSystem(conf);
072    } catch (IOException ex) {
073      throw new RuntimeException(ex);
074    }
075  }
076
077  @Override
078  protected void addOptions() {
079    this.addRequiredOptWithArg("t", "type",
080      "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
081    this.addOptWithArg("f", "file", "the filename to read from");
082    this.addOptWithArg("tn", "table", "the tablename to read from");
083    this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
084    this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
085    this.addOptWithArg("ch", "caching", "scanner caching value");
086  }
087
088  @Override
089  protected void processOptions(CommandLine cmd) {
090    type = cmd.getOptionValue("type");
091    file = cmd.getOptionValue("file");
092    tablename = cmd.getOptionValue("table");
093    snapshotName = cmd.getOptionValue("snapshot");
094    restoreDir = cmd.getOptionValue("restoredir");
095    caching = cmd.getOptionValue("caching");
096  }
097
098  protected void testHdfsStreaming(Path filename) throws IOException {
099    byte[] buf = new byte[1024];
100    FileSystem fs = filename.getFileSystem(getConf());
101
102    // read the file from start to finish
103    Stopwatch fileOpenTimer = Stopwatch.createUnstarted();
104    Stopwatch streamTimer = Stopwatch.createUnstarted();
105
106    fileOpenTimer.start();
107    FSDataInputStream in = fs.open(filename);
108    fileOpenTimer.stop();
109
110    long totalBytes = 0;
111    streamTimer.start();
112    while (true) {
113      int read = in.read(buf);
114      if (read < 0) {
115        break;
116      }
117      totalBytes += read;
118    }
119    streamTimer.stop();
120
121    double throughput = (double) totalBytes / streamTimer.elapsed(TimeUnit.SECONDS);
122
123    System.out.println("HDFS streaming: ");
124    System.out
125      .println("total time to open: " + fileOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
126    System.out.println("total time to read: " + streamTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
127    System.out.println(
128      "total bytes: " + totalBytes + " bytes (" + StringUtils.humanReadableInt(totalBytes) + ")");
129    System.out.println("throghput  : " + StringUtils.humanReadableInt((long) throughput) + "B/s");
130  }
131
132  private Scan getScan() {
133    Scan scan = new Scan(); // default scan settings
134    scan.setCacheBlocks(false);
135    scan.readVersions(1);
136    scan.setScanMetricsEnabled(true);
137    if (caching != null) {
138      scan.setCaching(Integer.parseInt(caching));
139    }
140
141    return scan;
142  }
143
144  public void testScan() throws IOException {
145    Stopwatch tableOpenTimer = Stopwatch.createUnstarted();
146    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
147    Stopwatch scanTimer = Stopwatch.createUnstarted();
148
149    tableOpenTimer.start();
150    Connection connection = ConnectionFactory.createConnection(getConf());
151    Table table = connection.getTable(TableName.valueOf(tablename));
152    tableOpenTimer.stop();
153
154    Scan scan = getScan();
155    scanOpenTimer.start();
156    ResultScanner scanner = table.getScanner(scan);
157    scanOpenTimer.stop();
158
159    long numRows = 0;
160    long numCells = 0;
161    scanTimer.start();
162    while (true) {
163      Result result = scanner.next();
164      if (result == null) {
165        break;
166      }
167      numRows++;
168
169      numCells += result.rawCells().length;
170    }
171    scanTimer.stop();
172    scanner.close();
173    table.close();
174    connection.close();
175
176    ScanMetrics metrics = scanner.getScanMetrics();
177    long totalBytes = metrics.countOfBytesInResults.get();
178    double throughput = (double) totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
179    double throughputRows = (double) numRows / scanTimer.elapsed(TimeUnit.SECONDS);
180    double throughputCells = (double) numCells / scanTimer.elapsed(TimeUnit.SECONDS);
181
182    System.out.println("HBase scan: ");
183    System.out.println(
184      "total time to open table: " + tableOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
185    System.out.println(
186      "total time to open scanner: " + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
187    System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
188
189    System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
190
191    System.out.println(
192      "total bytes: " + totalBytes + " bytes (" + StringUtils.humanReadableInt(totalBytes) + ")");
193    System.out.println("throughput  : " + StringUtils.humanReadableInt((long) throughput) + "B/s");
194    System.out.println("total rows  : " + numRows);
195    System.out
196      .println("throughput  : " + StringUtils.humanReadableInt((long) throughputRows) + " rows/s");
197    System.out.println("total cells : " + numCells);
198    System.out.println(
199      "throughput  : " + StringUtils.humanReadableInt((long) throughputCells) + " cells/s");
200  }
201
202  public void testSnapshotScan() throws IOException {
203    Stopwatch snapshotRestoreTimer = Stopwatch.createUnstarted();
204    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
205    Stopwatch scanTimer = Stopwatch.createUnstarted();
206
207    Path restoreDir = new Path(this.restoreDir);
208
209    snapshotRestoreTimer.start();
210    restoreDir.getFileSystem(conf).delete(restoreDir, true);
211    snapshotRestoreTimer.stop();
212
213    Scan scan = getScan();
214    scanOpenTimer.start();
215    TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
216    scanOpenTimer.stop();
217
218    long numRows = 0;
219    long numCells = 0;
220    scanTimer.start();
221    while (true) {
222      Result result = scanner.next();
223      if (result == null) {
224        break;
225      }
226      numRows++;
227
228      numCells += result.rawCells().length;
229    }
230    scanTimer.stop();
231    scanner.close();
232
233    ScanMetrics metrics = scanner.getScanMetrics();
234    long totalBytes = metrics.countOfBytesInResults.get();
235    double throughput = (double) totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
236    double throughputRows = (double) numRows / scanTimer.elapsed(TimeUnit.SECONDS);
237    double throughputCells = (double) numCells / scanTimer.elapsed(TimeUnit.SECONDS);
238
239    System.out.println("HBase scan snapshot: ");
240    System.out.println("total time to restore snapshot: "
241      + snapshotRestoreTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
242    System.out.println(
243      "total time to open scanner: " + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
244    System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
245
246    System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
247
248    System.out.println(
249      "total bytes: " + totalBytes + " bytes (" + StringUtils.humanReadableInt(totalBytes) + ")");
250    System.out.println("throughput  : " + StringUtils.humanReadableInt((long) throughput) + "B/s");
251    System.out.println("total rows  : " + numRows);
252    System.out
253      .println("throughput  : " + StringUtils.humanReadableInt((long) throughputRows) + " rows/s");
254    System.out.println("total cells : " + numCells);
255    System.out.println(
256      "throughput  : " + StringUtils.humanReadableInt((long) throughputCells) + " cells/s");
257
258  }
259
260  public static enum ScanCounter {
261    NUM_ROWS,
262    NUM_CELLS,
263  }
264
265  public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
266    @Override
267    protected void map(ImmutableBytesWritable key, Result value, Context context)
268      throws IOException, InterruptedException {
269      context.getCounter(ScanCounter.NUM_ROWS).increment(1);
270      context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
271    }
272  }
273
274  public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
275    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
276    Stopwatch scanTimer = Stopwatch.createUnstarted();
277
278    Scan scan = getScan();
279
280    String jobName = "testScanMapReduce";
281
282    Job job = new Job(conf);
283    job.setJobName(jobName);
284
285    job.setJarByClass(getClass());
286
287    TableMapReduceUtil.initTableMapperJob(this.tablename, scan, MyMapper.class, NullWritable.class,
288      NullWritable.class, job);
289
290    job.setNumReduceTasks(0);
291    job.setOutputKeyClass(NullWritable.class);
292    job.setOutputValueClass(NullWritable.class);
293    job.setOutputFormatClass(NullOutputFormat.class);
294
295    scanTimer.start();
296    job.waitForCompletion(true);
297    scanTimer.stop();
298
299    Counters counters = job.getCounters();
300    long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
301    long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
302
303    long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
304    double throughput = (double) totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
305    double throughputRows = (double) numRows / scanTimer.elapsed(TimeUnit.SECONDS);
306    double throughputCells = (double) numCells / scanTimer.elapsed(TimeUnit.SECONDS);
307
308    System.out.println("HBase scan mapreduce: ");
309    System.out.println(
310      "total time to open scanner: " + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
311    System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
312
313    System.out.println(
314      "total bytes: " + totalBytes + " bytes (" + StringUtils.humanReadableInt(totalBytes) + ")");
315    System.out.println("throughput  : " + StringUtils.humanReadableInt((long) throughput) + "B/s");
316    System.out.println("total rows  : " + numRows);
317    System.out
318      .println("throughput  : " + StringUtils.humanReadableInt((long) throughputRows) + " rows/s");
319    System.out.println("total cells : " + numCells);
320    System.out.println(
321      "throughput  : " + StringUtils.humanReadableInt((long) throughputCells) + " cells/s");
322  }
323
324  public void testSnapshotScanMapReduce()
325    throws IOException, InterruptedException, ClassNotFoundException {
326    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
327    Stopwatch scanTimer = Stopwatch.createUnstarted();
328
329    Scan scan = getScan();
330
331    String jobName = "testSnapshotScanMapReduce";
332
333    Job job = new Job(conf);
334    job.setJobName(jobName);
335
336    job.setJarByClass(getClass());
337
338    TableMapReduceUtil.initTableSnapshotMapperJob(this.snapshotName, scan, MyMapper.class,
339      NullWritable.class, NullWritable.class, job, true, new Path(restoreDir));
340
341    job.setNumReduceTasks(0);
342    job.setOutputKeyClass(NullWritable.class);
343    job.setOutputValueClass(NullWritable.class);
344    job.setOutputFormatClass(NullOutputFormat.class);
345
346    scanTimer.start();
347    job.waitForCompletion(true);
348    scanTimer.stop();
349
350    Counters counters = job.getCounters();
351    long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
352    long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
353
354    long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
355    double throughput = (double) totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
356    double throughputRows = (double) numRows / scanTimer.elapsed(TimeUnit.SECONDS);
357    double throughputCells = (double) numCells / scanTimer.elapsed(TimeUnit.SECONDS);
358
359    System.out.println("HBase scan mapreduce: ");
360    System.out.println(
361      "total time to open scanner: " + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
362    System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
363
364    System.out.println(
365      "total bytes: " + totalBytes + " bytes (" + StringUtils.humanReadableInt(totalBytes) + ")");
366    System.out.println("throughput  : " + StringUtils.humanReadableInt((long) throughput) + "B/s");
367    System.out.println("total rows  : " + numRows);
368    System.out
369      .println("throughput  : " + StringUtils.humanReadableInt((long) throughputRows) + " rows/s");
370    System.out.println("total cells : " + numCells);
371    System.out.println(
372      "throughput  : " + StringUtils.humanReadableInt((long) throughputCells) + " cells/s");
373  }
374
375  @Override
376  protected int doWork() throws Exception {
377    if (type.equals("streaming")) {
378      testHdfsStreaming(new Path(file));
379    } else if (type.equals("scan")) {
380      testScan();
381    } else if (type.equals("snapshotscan")) {
382      testSnapshotScan();
383    } else if (type.equals("scanmapreduce")) {
384      testScanMapReduce();
385    } else if (type.equals("snapshotscanmapreduce")) {
386      testSnapshotScanMapReduce();
387    }
388    return 0;
389  }
390
391  public static void main(String[] args) throws Exception {
392    int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
393    System.exit(ret);
394  }
395}