001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.util.concurrent.TimeUnit;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FSDataInputStream;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.ConnectionFactory;
031import org.apache.hadoop.hbase.client.Result;
032import org.apache.hadoop.hbase.client.ResultScanner;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableSnapshotScanner;
036import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
037import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
038import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
039import org.apache.hadoop.hbase.mapreduce.TableMapper;
040import org.apache.hadoop.hbase.util.AbstractHBaseTool;
041import org.apache.hadoop.hbase.util.FSUtils;
042import org.apache.hadoop.io.NullWritable;
043import org.apache.hadoop.mapreduce.Counters;
044import org.apache.hadoop.mapreduce.Job;
045import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
046import org.apache.hadoop.util.StringUtils;
047import org.apache.hadoop.util.ToolRunner;
048
049import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch;
050import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
051
052/**
053 * A simple performance evaluation tool for single client and MR scans
054 * and snapshot scans.
055 */
056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
057public class ScanPerformanceEvaluation extends AbstractHBaseTool {
058
059  private static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters";
060
061  private String type;
062  private String file;
063  private String tablename;
064  private String snapshotName;
065  private String restoreDir;
066  private String caching;
067
068  @Override
069  public void setConf(Configuration conf) {
070    super.setConf(conf);
071    Path rootDir;
072    try {
073      rootDir = FSUtils.getRootDir(conf);
074      rootDir.getFileSystem(conf);
075    } catch (IOException ex) {
076      throw new RuntimeException(ex);
077    }
078  }
079
080  @Override
081  protected void addOptions() {
082    this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
083    this.addOptWithArg("f", "file", "the filename to read from");
084    this.addOptWithArg("tn", "table", "the tablename to read from");
085    this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
086    this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
087    this.addOptWithArg("ch", "caching", "scanner caching value");
088  }
089
090  @Override
091  protected void processOptions(CommandLine cmd) {
092    type = cmd.getOptionValue("type");
093    file = cmd.getOptionValue("file");
094    tablename = cmd.getOptionValue("table");
095    snapshotName = cmd.getOptionValue("snapshot");
096    restoreDir = cmd.getOptionValue("restoredir");
097    caching = cmd.getOptionValue("caching");
098  }
099
100  protected void testHdfsStreaming(Path filename) throws IOException {
101    byte[] buf = new byte[1024];
102    FileSystem fs = filename.getFileSystem(getConf());
103
104    // read the file from start to finish
105    Stopwatch fileOpenTimer = Stopwatch.createUnstarted();
106    Stopwatch streamTimer = Stopwatch.createUnstarted();
107
108    fileOpenTimer.start();
109    FSDataInputStream in = fs.open(filename);
110    fileOpenTimer.stop();
111
112    long totalBytes = 0;
113    streamTimer.start();
114    while (true) {
115      int read = in.read(buf);
116      if (read < 0) {
117        break;
118      }
119      totalBytes += read;
120    }
121    streamTimer.stop();
122
123    double throughput = (double)totalBytes / streamTimer.elapsed(TimeUnit.SECONDS);
124
125    System.out.println("HDFS streaming: ");
126    System.out.println("total time to open: " +
127      fileOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
128    System.out.println("total time to read: " + streamTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
129    System.out.println("total bytes: " + totalBytes + " bytes ("
130        + StringUtils.humanReadableInt(totalBytes) + ")");
131    System.out.println("throghput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
132  }
133
134  private Scan getScan() {
135    Scan scan = new Scan(); // default scan settings
136    scan.setCacheBlocks(false);
137    scan.setMaxVersions(1);
138    scan.setScanMetricsEnabled(true);
139    if (caching != null) {
140      scan.setCaching(Integer.parseInt(caching));
141    }
142
143    return scan;
144  }
145
146  public void testScan() throws IOException {
147    Stopwatch tableOpenTimer = Stopwatch.createUnstarted();
148    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
149    Stopwatch scanTimer = Stopwatch.createUnstarted();
150
151    tableOpenTimer.start();
152    Connection connection = ConnectionFactory.createConnection(getConf());
153    Table table = connection.getTable(TableName.valueOf(tablename));
154    tableOpenTimer.stop();
155
156    Scan scan = getScan();
157    scanOpenTimer.start();
158    ResultScanner scanner = table.getScanner(scan);
159    scanOpenTimer.stop();
160
161    long numRows = 0;
162    long numCells = 0;
163    scanTimer.start();
164    while (true) {
165      Result result = scanner.next();
166      if (result == null) {
167        break;
168      }
169      numRows++;
170
171      numCells += result.rawCells().length;
172    }
173    scanTimer.stop();
174    scanner.close();
175    table.close();
176    connection.close();
177
178    ScanMetrics metrics = scan.getScanMetrics();
179    long totalBytes = metrics.countOfBytesInResults.get();
180    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
181    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
182    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
183
184    System.out.println("HBase scan: ");
185    System.out.println("total time to open table: " +
186      tableOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
187    System.out.println("total time to open scanner: " +
188      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
189    System.out.println("total time to scan: " +
190      scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
191
192    System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
193
194    System.out.println("total bytes: " + totalBytes + " bytes ("
195        + StringUtils.humanReadableInt(totalBytes) + ")");
196    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
197    System.out.println("total rows  : " + numRows);
198    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
199    System.out.println("total cells : " + numCells);
200    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
201  }
202
203
204  public void testSnapshotScan() throws IOException {
205    Stopwatch snapshotRestoreTimer = Stopwatch.createUnstarted();
206    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
207    Stopwatch scanTimer = Stopwatch.createUnstarted();
208
209    Path restoreDir = new Path(this.restoreDir);
210
211    snapshotRestoreTimer.start();
212    restoreDir.getFileSystem(conf).delete(restoreDir, true);
213    snapshotRestoreTimer.stop();
214
215    Scan scan = getScan();
216    scanOpenTimer.start();
217    TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
218    scanOpenTimer.stop();
219
220    long numRows = 0;
221    long numCells = 0;
222    scanTimer.start();
223    while (true) {
224      Result result = scanner.next();
225      if (result == null) {
226        break;
227      }
228      numRows++;
229
230      numCells += result.rawCells().length;
231    }
232    scanTimer.stop();
233    scanner.close();
234
235    ScanMetrics metrics = scanner.getScanMetrics();
236    long totalBytes = metrics.countOfBytesInResults.get();
237    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
238    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
239    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
240
241    System.out.println("HBase scan snapshot: ");
242    System.out.println("total time to restore snapshot: " +
243      snapshotRestoreTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
244    System.out.println("total time to open scanner: " +
245      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
246    System.out.println("total time to scan: " +
247      scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
248
249    System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
250
251    System.out.println("total bytes: " + totalBytes + " bytes ("
252        + StringUtils.humanReadableInt(totalBytes) + ")");
253    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
254    System.out.println("total rows  : " + numRows);
255    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
256    System.out.println("total cells : " + numCells);
257    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
258
259  }
260
261  public static enum ScanCounter {
262    NUM_ROWS,
263    NUM_CELLS,
264  }
265
266  public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
267    @Override
268    protected void map(ImmutableBytesWritable key, Result value,
269        Context context) throws IOException,
270        InterruptedException {
271      context.getCounter(ScanCounter.NUM_ROWS).increment(1);
272      context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
273    }
274  }
275
276  public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
277    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
278    Stopwatch scanTimer = Stopwatch.createUnstarted();
279
280    Scan scan = getScan();
281
282    String jobName = "testScanMapReduce";
283
284    Job job = new Job(conf);
285    job.setJobName(jobName);
286
287    job.setJarByClass(getClass());
288
289    TableMapReduceUtil.initTableMapperJob(
290        this.tablename,
291        scan,
292        MyMapper.class,
293        NullWritable.class,
294        NullWritable.class,
295        job
296    );
297
298    job.setNumReduceTasks(0);
299    job.setOutputKeyClass(NullWritable.class);
300    job.setOutputValueClass(NullWritable.class);
301    job.setOutputFormatClass(NullOutputFormat.class);
302
303    scanTimer.start();
304    job.waitForCompletion(true);
305    scanTimer.stop();
306
307    Counters counters = job.getCounters();
308    long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
309    long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
310
311    long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
312    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
313    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
314    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
315
316    System.out.println("HBase scan mapreduce: ");
317    System.out.println("total time to open scanner: " +
318      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
319    System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
320
321    System.out.println("total bytes: " + totalBytes + " bytes ("
322        + StringUtils.humanReadableInt(totalBytes) + ")");
323    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
324    System.out.println("total rows  : " + numRows);
325    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
326    System.out.println("total cells : " + numCells);
327    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
328  }
329
330  public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
331    Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
332    Stopwatch scanTimer = Stopwatch.createUnstarted();
333
334    Scan scan = getScan();
335
336    String jobName = "testSnapshotScanMapReduce";
337
338    Job job = new Job(conf);
339    job.setJobName(jobName);
340
341    job.setJarByClass(getClass());
342
343    TableMapReduceUtil.initTableSnapshotMapperJob(
344        this.snapshotName,
345        scan,
346        MyMapper.class,
347        NullWritable.class,
348        NullWritable.class,
349        job,
350        true,
351        new Path(restoreDir)
352    );
353
354    job.setNumReduceTasks(0);
355    job.setOutputKeyClass(NullWritable.class);
356    job.setOutputValueClass(NullWritable.class);
357    job.setOutputFormatClass(NullOutputFormat.class);
358
359    scanTimer.start();
360    job.waitForCompletion(true);
361    scanTimer.stop();
362
363    Counters counters = job.getCounters();
364    long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
365    long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
366
367    long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
368    double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
369    double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
370    double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
371
372    System.out.println("HBase scan mapreduce: ");
373    System.out.println("total time to open scanner: " +
374      scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
375    System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
376
377    System.out.println("total bytes: " + totalBytes + " bytes ("
378        + StringUtils.humanReadableInt(totalBytes) + ")");
379    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
380    System.out.println("total rows  : " + numRows);
381    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
382    System.out.println("total cells : " + numCells);
383    System.out.println("throughput  : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
384  }
385
386  @Override
387  protected int doWork() throws Exception {
388    if (type.equals("streaming")) {
389      testHdfsStreaming(new Path(file));
390    } else if (type.equals("scan")){
391      testScan();
392    } else if (type.equals("snapshotscan")) {
393      testSnapshotScan();
394    } else if (type.equals("scanmapreduce")) {
395      testScanMapReduce();
396    } else if (type.equals("snapshotscanmapreduce")) {
397      testSnapshotScanMapReduce();
398    }
399    return 0;
400  }
401
402  public static void main (String[] args) throws Exception {
403    int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
404    System.exit(ret);
405  }
406}