001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.util.concurrent.TimeUnit;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FSDataInputStream;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.client.Connection;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.client.Result;
030import org.apache.hadoop.hbase.client.ResultScanner;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.client.TableSnapshotScanner;
034import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
035import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
036import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
037import org.apache.hadoop.hbase.mapreduce.TableMapper;
038import org.apache.hadoop.hbase.util.AbstractHBaseTool;
039import org.apache.hadoop.hbase.util.CommonFSUtils;
040import org.apache.hadoop.io.NullWritable;
041import org.apache.hadoop.mapreduce.Counters;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
044import org.apache.hadoop.util.StringUtils;
045import org.apache.hadoop.util.ToolRunner;
046import org.apache.yetus.audience.InterfaceAudience;
047
048import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch;
049import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
050
051/**
052 * A simple performance evaluation tool for single client and MR scans
053 * and snapshot scans.
054 */
055@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
056public class ScanPerformanceEvaluation extends AbstractHBaseTool {
057
058  private static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters";
059
060  private String type;
061  private String file;
062  private String tablename;
063  private String snapshotName;
064  private String restoreDir;
065  private String caching;
066
067  @Override
068  public void setConf(Configuration conf) {
069    super.setConf(conf);
070    Path rootDir;
071    try {
072      rootDir = CommonFSUtils.getRootDir(conf);
073      rootDir.getFileSystem(conf);
074    } catch (IOException ex) {
075      throw new RuntimeException(ex);
076    }
077  }
078
079  @Override
080  protected void addOptions() {
081    this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
082    this.addOptWithArg("f", "file", "the filename to read from");
083    this.addOptWithArg("tn", "table", "the tablename to read from");
084    this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
085    this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
086    this.addOptWithArg("ch", "caching", "scanner caching value");
087  }
088
089  @Override
090  protected void processOptions(CommandLine cmd) {
091    type = cmd.getOptionValue("type");
092    file = cmd.getOptionValue("file");
093    tablename = cmd.getOptionValue("table");
094    snapshotName = cmd.getOptionValue("snapshot");
095    restoreDir = cmd.getOptionValue("restoredir");
096    caching = cmd.getOptionValue("caching");
097  }
098
099  protected void testHdfsStreaming(Path filename) throws IOException {
100    byte[] buf = new byte[1024];
101    FileSystem fs = filename.getFileSystem(getConf());
102
103    // read the file from start to finish
104    Stopwatch fileOpenTimer = Stopwatch.createUnstarted();
105    Stopwatch streamTimer = Stopwatch.createUnstarted();
106
107    fileOpenTimer.start();
108    FSDataInputStream in = fs.open(filename);
109    fileOpenTimer.stop();
110
111    long totalBytes = 0;
112    streamTimer.start();
113    while (true) {
114      int read = in.read(buf);
115      if (read < 0) {
116        break;
117      }
118      totalBytes += read;
119    }
120    streamTimer.stop();
121
122    double throughput = (double)totalBytes / streamTimer.elapsed(TimeUnit.SECONDS);
123
124    System.out.println("HDFS streaming: ");
125    System.out.println("total time to open: " +
126      fileOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
127    System.out.println("total time to read: " + streamTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
128    System.out.println("total bytes: " + totalBytes + " bytes ("
129        + StringUtils.humanReadableInt(totalBytes) + ")");
130    System.out.println("throghput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
131  }
132
133  private Scan getScan() {
134    Scan scan = new Scan(); // default scan settings
135    scan.setCacheBlocks(false);
136    scan.setMaxVersions(1);
137    scan.setScanMetricsEnabled(true);
138    if (caching != null) {
139      scan.setCaching(Integer.parseInt(caching));
140    }
141
142    return scan;
143  }
144
145  public void testScan() throws IOException {
146    Stopwatch tableOpenTimer = Stopwatch.createUnstarted();
147    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
148    Stopwatch scanTimer = Stopwatch.createUnstarted();
149
150    tableOpenTimer.start();
151    Connection connection = ConnectionFactory.createConnection(getConf());
152    Table table = connection.getTable(TableName.valueOf(tablename));
153    tableOpenTimer.stop();
154
155    Scan scan = getScan();
156    scanOpenTimer.start();
157    ResultScanner scanner = table.getScanner(scan);
158    scanOpenTimer.stop();
159
160    long numRows = 0;
161    long numCells = 0;
162    scanTimer.start();
163    while (true) {
164      Result result = scanner.next();
165      if (result == null) {
166        break;
167      }
168      numRows++;
169
170      numCells += result.rawCells().length;
171    }
172    scanTimer.stop();
173    scanner.close();
174    table.close();
175    connection.close();
176
177    ScanMetrics metrics = scan.getScanMetrics();
178    long totalBytes = metrics.countOfBytesInResults.get();
179    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
180    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
181    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
182
183    System.out.println("HBase scan: ");
184    System.out.println("total time to open table: " +
185      tableOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
186    System.out.println("total time to open scanner: " +
187      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
188    System.out.println("total time to scan: " +
189      scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
190
191    System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
192
193    System.out.println("total bytes: " + totalBytes + " bytes ("
194        + StringUtils.humanReadableInt(totalBytes) + ")");
195    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
196    System.out.println("total rows  : " + numRows);
197    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
198    System.out.println("total cells : " + numCells);
199    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
200  }
201
202
203  public void testSnapshotScan() throws IOException {
204    Stopwatch snapshotRestoreTimer = Stopwatch.createUnstarted();
205    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
206    Stopwatch scanTimer = Stopwatch.createUnstarted();
207
208    Path restoreDir = new Path(this.restoreDir);
209
210    snapshotRestoreTimer.start();
211    restoreDir.getFileSystem(conf).delete(restoreDir, true);
212    snapshotRestoreTimer.stop();
213
214    Scan scan = getScan();
215    scanOpenTimer.start();
216    TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
217    scanOpenTimer.stop();
218
219    long numRows = 0;
220    long numCells = 0;
221    scanTimer.start();
222    while (true) {
223      Result result = scanner.next();
224      if (result == null) {
225        break;
226      }
227      numRows++;
228
229      numCells += result.rawCells().length;
230    }
231    scanTimer.stop();
232    scanner.close();
233
234    ScanMetrics metrics = scanner.getScanMetrics();
235    long totalBytes = metrics.countOfBytesInResults.get();
236    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
237    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
238    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
239
240    System.out.println("HBase scan snapshot: ");
241    System.out.println("total time to restore snapshot: " +
242      snapshotRestoreTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
243    System.out.println("total time to open scanner: " +
244      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
245    System.out.println("total time to scan: " +
246      scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
247
248    System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
249
250    System.out.println("total bytes: " + totalBytes + " bytes ("
251        + StringUtils.humanReadableInt(totalBytes) + ")");
252    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
253    System.out.println("total rows  : " + numRows);
254    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
255    System.out.println("total cells : " + numCells);
256    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
257
258  }
259
260  public static enum ScanCounter {
261    NUM_ROWS,
262    NUM_CELLS,
263  }
264
265  public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
266    @Override
267    protected void map(ImmutableBytesWritable key, Result value,
268        Context context) throws IOException,
269        InterruptedException {
270      context.getCounter(ScanCounter.NUM_ROWS).increment(1);
271      context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
272    }
273  }
274
275  public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
276    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
277    Stopwatch scanTimer = Stopwatch.createUnstarted();
278
279    Scan scan = getScan();
280
281    String jobName = "testScanMapReduce";
282
283    Job job = new Job(conf);
284    job.setJobName(jobName);
285
286    job.setJarByClass(getClass());
287
288    TableMapReduceUtil.initTableMapperJob(
289        this.tablename,
290        scan,
291        MyMapper.class,
292        NullWritable.class,
293        NullWritable.class,
294        job
295    );
296
297    job.setNumReduceTasks(0);
298    job.setOutputKeyClass(NullWritable.class);
299    job.setOutputValueClass(NullWritable.class);
300    job.setOutputFormatClass(NullOutputFormat.class);
301
302    scanTimer.start();
303    job.waitForCompletion(true);
304    scanTimer.stop();
305
306    Counters counters = job.getCounters();
307    long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
308    long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
309
310    long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
311    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
312    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
313    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
314
315    System.out.println("HBase scan mapreduce: ");
316    System.out.println("total time to open scanner: " +
317      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
318    System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
319
320    System.out.println("total bytes: " + totalBytes + " bytes ("
321        + StringUtils.humanReadableInt(totalBytes) + ")");
322    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
323    System.out.println("total rows  : " + numRows);
324    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
325    System.out.println("total cells : " + numCells);
326    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
327  }
328
329  public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
330    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
331    Stopwatch scanTimer = Stopwatch.createUnstarted();
332
333    Scan scan = getScan();
334
335    String jobName = "testSnapshotScanMapReduce";
336
337    Job job = new Job(conf);
338    job.setJobName(jobName);
339
340    job.setJarByClass(getClass());
341
342    TableMapReduceUtil.initTableSnapshotMapperJob(
343        this.snapshotName,
344        scan,
345        MyMapper.class,
346        NullWritable.class,
347        NullWritable.class,
348        job,
349        true,
350        new Path(restoreDir)
351    );
352
353    job.setNumReduceTasks(0);
354    job.setOutputKeyClass(NullWritable.class);
355    job.setOutputValueClass(NullWritable.class);
356    job.setOutputFormatClass(NullOutputFormat.class);
357
358    scanTimer.start();
359    job.waitForCompletion(true);
360    scanTimer.stop();
361
362    Counters counters = job.getCounters();
363    long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
364    long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
365
366    long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
367    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
368    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
369    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
370
371    System.out.println("HBase scan mapreduce: ");
372    System.out.println("total time to open scanner: " +
373      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
374    System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
375
376    System.out.println("total bytes: " + totalBytes + " bytes ("
377        + StringUtils.humanReadableInt(totalBytes) + ")");
378    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
379    System.out.println("total rows  : " + numRows);
380    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
381    System.out.println("total cells : " + numCells);
382    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
383  }
384
385  @Override
386  protected int doWork() throws Exception {
387    if (type.equals("streaming")) {
388      testHdfsStreaming(new Path(file));
389    } else if (type.equals("scan")){
390      testScan();
391    } else if (type.equals("snapshotscan")) {
392      testSnapshotScan();
393    } else if (type.equals("scanmapreduce")) {
394      testScanMapReduce();
395    } else if (type.equals("snapshotscanmapreduce")) {
396      testSnapshotScanMapReduce();
397    }
398    return 0;
399  }
400
401  public static void main (String[] args) throws Exception {
402    int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
403    System.exit(ret);
404  }
405}