001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.concurrent.TimeUnit; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.FSDataInputStream; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.client.Connection; 027import org.apache.hadoop.hbase.client.ConnectionFactory; 028import org.apache.hadoop.hbase.client.Result; 029import org.apache.hadoop.hbase.client.ResultScanner; 030import org.apache.hadoop.hbase.client.Scan; 031import org.apache.hadoop.hbase.client.Table; 032import org.apache.hadoop.hbase.client.TableSnapshotScanner; 033import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 034import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 035import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 036import org.apache.hadoop.hbase.mapreduce.TableMapper; 037import org.apache.hadoop.hbase.util.AbstractHBaseTool; 038import org.apache.hadoop.hbase.util.CommonFSUtils; 039import org.apache.hadoop.io.NullWritable; 040import org.apache.hadoop.mapreduce.Counters; 041import org.apache.hadoop.mapreduce.Job; 042import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 043import org.apache.hadoop.util.StringUtils; 044import org.apache.hadoop.util.ToolRunner; 045import org.apache.yetus.audience.InterfaceAudience; 046 047import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch; 048import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 049 050/** 051 * A simple performance evaluation tool for single client and MR scans and snapshot scans. 052 */ 053@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 054public class ScanPerformanceEvaluation extends AbstractHBaseTool { 055 056 private static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters"; 057 058 private String type; 059 private String file; 060 private String tablename; 061 private String snapshotName; 062 private String restoreDir; 063 private String caching; 064 065 @Override 066 public void setConf(Configuration conf) { 067 super.setConf(conf); 068 Path rootDir; 069 try { 070 rootDir = CommonFSUtils.getRootDir(conf); 071 rootDir.getFileSystem(conf); 072 } catch (IOException ex) { 073 throw new RuntimeException(ex); 074 } 075 } 076 077 @Override 078 protected void addOptions() { 079 this.addRequiredOptWithArg("t", "type", 080 "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce"); 081 this.addOptWithArg("f", "file", "the filename to read from"); 082 this.addOptWithArg("tn", "table", "the tablename to read from"); 083 this.addOptWithArg("sn", "snapshot", "the snapshot name to read from"); 084 this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot"); 085 this.addOptWithArg("ch", "caching", "scanner caching value"); 086 } 087 088 @Override 089 protected void processOptions(CommandLine cmd) { 090 type = cmd.getOptionValue("type"); 091 file = cmd.getOptionValue("file"); 092 tablename = cmd.getOptionValue("table"); 093 snapshotName = cmd.getOptionValue("snapshot"); 094 restoreDir = cmd.getOptionValue("restoredir"); 095 caching = cmd.getOptionValue("caching"); 096 } 097 098 protected void testHdfsStreaming(Path filename) throws IOException { 099 byte[] buf = new byte[1024]; 100 FileSystem fs = filename.getFileSystem(getConf()); 101 102 // read the file from start to finish 103 Stopwatch fileOpenTimer = Stopwatch.createUnstarted(); 104 Stopwatch streamTimer = Stopwatch.createUnstarted(); 105 106 fileOpenTimer.start(); 107 FSDataInputStream in = fs.open(filename); 108 fileOpenTimer.stop(); 109 110 long totalBytes = 0; 111 streamTimer.start(); 112 while (true) { 113 int read = in.read(buf); 114 if (read < 0) { 115 break; 116 } 117 totalBytes += read; 118 } 119 streamTimer.stop(); 120 121 double throughput = (double) totalBytes / streamTimer.elapsed(TimeUnit.SECONDS); 122 123 System.out.println("HDFS streaming: "); 124 System.out 125 .println("total time to open: " + fileOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 126 System.out.println("total time to read: " + streamTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 127 System.out.println( 128 "total bytes: " + totalBytes + " bytes (" + StringUtils.humanReadableInt(totalBytes) + ")"); 129 System.out.println("throghput : " + StringUtils.humanReadableInt((long) throughput) + "B/s"); 130 } 131 132 private Scan getScan() { 133 Scan scan = new Scan(); // default scan settings 134 scan.setCacheBlocks(false); 135 scan.setMaxVersions(1); 136 scan.setScanMetricsEnabled(true); 137 if (caching != null) { 138 scan.setCaching(Integer.parseInt(caching)); 139 } 140 141 return scan; 142 } 143 144 public void testScan() throws IOException { 145 Stopwatch tableOpenTimer = Stopwatch.createUnstarted(); 146 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 147 Stopwatch scanTimer = Stopwatch.createUnstarted(); 148 149 tableOpenTimer.start(); 150 Connection connection = ConnectionFactory.createConnection(getConf()); 151 Table table = connection.getTable(TableName.valueOf(tablename)); 152 tableOpenTimer.stop(); 153 154 Scan scan = getScan(); 155 scanOpenTimer.start(); 156 ResultScanner scanner = table.getScanner(scan); 157 scanOpenTimer.stop(); 158 159 long numRows = 0; 160 long numCells = 0; 161 scanTimer.start(); 162 while (true) { 163 Result result = scanner.next(); 164 if (result == null) { 165 break; 166 } 167 numRows++; 168 169 numCells += result.rawCells().length; 170 } 171 scanTimer.stop(); 172 scanner.close(); 173 table.close(); 174 connection.close(); 175 176 ScanMetrics metrics = scan.getScanMetrics(); 177 long totalBytes = metrics.countOfBytesInResults.get(); 178 double throughput = (double) totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 179 double throughputRows = (double) numRows / scanTimer.elapsed(TimeUnit.SECONDS); 180 double throughputCells = (double) numCells / scanTimer.elapsed(TimeUnit.SECONDS); 181 182 System.out.println("HBase scan: "); 183 System.out.println( 184 "total time to open table: " + tableOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 185 System.out.println( 186 "total time to open scanner: " + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 187 System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 188 189 System.out.println("Scan metrics:\n" + metrics.getMetricsMap()); 190 191 System.out.println( 192 "total bytes: " + totalBytes + " bytes (" + StringUtils.humanReadableInt(totalBytes) + ")"); 193 System.out.println("throughput : " + StringUtils.humanReadableInt((long) throughput) + "B/s"); 194 System.out.println("total rows : " + numRows); 195 System.out 196 .println("throughput : " + StringUtils.humanReadableInt((long) throughputRows) + " rows/s"); 197 System.out.println("total cells : " + numCells); 198 System.out.println( 199 "throughput : " + StringUtils.humanReadableInt((long) throughputCells) + " cells/s"); 200 } 201 202 public void testSnapshotScan() throws IOException { 203 Stopwatch snapshotRestoreTimer = Stopwatch.createUnstarted(); 204 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 205 Stopwatch scanTimer = Stopwatch.createUnstarted(); 206 207 Path restoreDir = new Path(this.restoreDir); 208 209 snapshotRestoreTimer.start(); 210 restoreDir.getFileSystem(conf).delete(restoreDir, true); 211 snapshotRestoreTimer.stop(); 212 213 Scan scan = getScan(); 214 scanOpenTimer.start(); 215 TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); 216 scanOpenTimer.stop(); 217 218 long numRows = 0; 219 long numCells = 0; 220 scanTimer.start(); 221 while (true) { 222 Result result = scanner.next(); 223 if (result == null) { 224 break; 225 } 226 numRows++; 227 228 numCells += result.rawCells().length; 229 } 230 scanTimer.stop(); 231 scanner.close(); 232 233 ScanMetrics metrics = scanner.getScanMetrics(); 234 long totalBytes = metrics.countOfBytesInResults.get(); 235 double throughput = (double) totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 236 double throughputRows = (double) numRows / scanTimer.elapsed(TimeUnit.SECONDS); 237 double throughputCells = (double) numCells / scanTimer.elapsed(TimeUnit.SECONDS); 238 239 System.out.println("HBase scan snapshot: "); 240 System.out.println("total time to restore snapshot: " 241 + snapshotRestoreTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 242 System.out.println( 243 "total time to open scanner: " + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 244 System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 245 246 System.out.println("Scan metrics:\n" + metrics.getMetricsMap()); 247 248 System.out.println( 249 "total bytes: " + totalBytes + " bytes (" + StringUtils.humanReadableInt(totalBytes) + ")"); 250 System.out.println("throughput : " + StringUtils.humanReadableInt((long) throughput) + "B/s"); 251 System.out.println("total rows : " + numRows); 252 System.out 253 .println("throughput : " + StringUtils.humanReadableInt((long) throughputRows) + " rows/s"); 254 System.out.println("total cells : " + numCells); 255 System.out.println( 256 "throughput : " + StringUtils.humanReadableInt((long) throughputCells) + " cells/s"); 257 258 } 259 260 public static enum ScanCounter { 261 NUM_ROWS, 262 NUM_CELLS, 263 } 264 265 public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> { 266 @Override 267 protected void map(ImmutableBytesWritable key, Result value, Context context) 268 throws IOException, InterruptedException { 269 context.getCounter(ScanCounter.NUM_ROWS).increment(1); 270 context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length); 271 } 272 } 273 274 public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException { 275 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 276 Stopwatch scanTimer = Stopwatch.createUnstarted(); 277 278 Scan scan = getScan(); 279 280 String jobName = "testScanMapReduce"; 281 282 Job job = new Job(conf); 283 job.setJobName(jobName); 284 285 job.setJarByClass(getClass()); 286 287 TableMapReduceUtil.initTableMapperJob(this.tablename, scan, MyMapper.class, NullWritable.class, 288 NullWritable.class, job); 289 290 job.setNumReduceTasks(0); 291 job.setOutputKeyClass(NullWritable.class); 292 job.setOutputValueClass(NullWritable.class); 293 job.setOutputFormatClass(NullOutputFormat.class); 294 295 scanTimer.start(); 296 job.waitForCompletion(true); 297 scanTimer.stop(); 298 299 Counters counters = job.getCounters(); 300 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue(); 301 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue(); 302 303 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue(); 304 double throughput = (double) totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 305 double throughputRows = (double) numRows / scanTimer.elapsed(TimeUnit.SECONDS); 306 double throughputCells = (double) numCells / scanTimer.elapsed(TimeUnit.SECONDS); 307 308 System.out.println("HBase scan mapreduce: "); 309 System.out.println( 310 "total time to open scanner: " + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 311 System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 312 313 System.out.println( 314 "total bytes: " + totalBytes + " bytes (" + StringUtils.humanReadableInt(totalBytes) + ")"); 315 System.out.println("throughput : " + StringUtils.humanReadableInt((long) throughput) + "B/s"); 316 System.out.println("total rows : " + numRows); 317 System.out 318 .println("throughput : " + StringUtils.humanReadableInt((long) throughputRows) + " rows/s"); 319 System.out.println("total cells : " + numCells); 320 System.out.println( 321 "throughput : " + StringUtils.humanReadableInt((long) throughputCells) + " cells/s"); 322 } 323 324 public void testSnapshotScanMapReduce() 325 throws IOException, InterruptedException, ClassNotFoundException { 326 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 327 Stopwatch scanTimer = Stopwatch.createUnstarted(); 328 329 Scan scan = getScan(); 330 331 String jobName = "testSnapshotScanMapReduce"; 332 333 Job job = new Job(conf); 334 job.setJobName(jobName); 335 336 job.setJarByClass(getClass()); 337 338 TableMapReduceUtil.initTableSnapshotMapperJob(this.snapshotName, scan, MyMapper.class, 339 NullWritable.class, NullWritable.class, job, true, new Path(restoreDir)); 340 341 job.setNumReduceTasks(0); 342 job.setOutputKeyClass(NullWritable.class); 343 job.setOutputValueClass(NullWritable.class); 344 job.setOutputFormatClass(NullOutputFormat.class); 345 346 scanTimer.start(); 347 job.waitForCompletion(true); 348 scanTimer.stop(); 349 350 Counters counters = job.getCounters(); 351 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue(); 352 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue(); 353 354 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue(); 355 double throughput = (double) totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 356 double throughputRows = (double) numRows / scanTimer.elapsed(TimeUnit.SECONDS); 357 double throughputCells = (double) numCells / scanTimer.elapsed(TimeUnit.SECONDS); 358 359 System.out.println("HBase scan mapreduce: "); 360 System.out.println( 361 "total time to open scanner: " + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 362 System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 363 364 System.out.println( 365 "total bytes: " + totalBytes + " bytes (" + StringUtils.humanReadableInt(totalBytes) + ")"); 366 System.out.println("throughput : " + StringUtils.humanReadableInt((long) throughput) + "B/s"); 367 System.out.println("total rows : " + numRows); 368 System.out 369 .println("throughput : " + StringUtils.humanReadableInt((long) throughputRows) + " rows/s"); 370 System.out.println("total cells : " + numCells); 371 System.out.println( 372 "throughput : " + StringUtils.humanReadableInt((long) throughputCells) + " cells/s"); 373 } 374 375 @Override 376 protected int doWork() throws Exception { 377 if (type.equals("streaming")) { 378 testHdfsStreaming(new Path(file)); 379 } else if (type.equals("scan")) { 380 testScan(); 381 } else if (type.equals("snapshotscan")) { 382 testSnapshotScan(); 383 } else if (type.equals("scanmapreduce")) { 384 testScanMapReduce(); 385 } else if (type.equals("snapshotscanmapreduce")) { 386 testSnapshotScanMapReduce(); 387 } 388 return 0; 389 } 390 391 public static void main(String[] args) throws Exception { 392 int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args); 393 System.exit(ret); 394 } 395}