001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.util.concurrent.TimeUnit; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FSDataInputStream; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.client.ConnectionFactory; 031import org.apache.hadoop.hbase.client.Result; 032import org.apache.hadoop.hbase.client.ResultScanner; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableSnapshotScanner; 036import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 037import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 038import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 039import org.apache.hadoop.hbase.mapreduce.TableMapper; 040import org.apache.hadoop.hbase.util.AbstractHBaseTool; 041import org.apache.hadoop.hbase.util.FSUtils; 042import org.apache.hadoop.io.NullWritable; 043import org.apache.hadoop.mapreduce.Counters; 044import org.apache.hadoop.mapreduce.Job; 045import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 046import org.apache.hadoop.util.StringUtils; 047import org.apache.hadoop.util.ToolRunner; 048 049import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch; 050import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 051 052/** 053 * A simple performance evaluation tool for single client and MR scans 054 * and snapshot scans. 055 */ 056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 057public class ScanPerformanceEvaluation extends AbstractHBaseTool { 058 059 private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters"; 060 061 private String type; 062 private String file; 063 private String tablename; 064 private String snapshotName; 065 private String restoreDir; 066 private String caching; 067 068 @Override 069 public void setConf(Configuration conf) { 070 super.setConf(conf); 071 Path rootDir; 072 try { 073 rootDir = FSUtils.getRootDir(conf); 074 rootDir.getFileSystem(conf); 075 } catch (IOException ex) { 076 throw new RuntimeException(ex); 077 } 078 } 079 080 @Override 081 protected void addOptions() { 082 this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce"); 083 this.addOptWithArg("f", "file", "the filename to read from"); 084 this.addOptWithArg("tn", "table", "the tablename to read from"); 085 this.addOptWithArg("sn", "snapshot", "the snapshot name to read from"); 086 this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot"); 087 this.addOptWithArg("ch", "caching", "scanner caching value"); 088 } 089 090 @Override 091 protected void processOptions(CommandLine cmd) { 092 type = cmd.getOptionValue("type"); 093 file = cmd.getOptionValue("file"); 094 tablename = cmd.getOptionValue("table"); 095 snapshotName = cmd.getOptionValue("snapshot"); 096 restoreDir = cmd.getOptionValue("restoredir"); 097 caching = cmd.getOptionValue("caching"); 098 } 099 100 protected void testHdfsStreaming(Path filename) throws IOException { 101 byte[] buf = new byte[1024]; 102 FileSystem fs = filename.getFileSystem(getConf()); 103 104 // read the file from start to finish 105 Stopwatch fileOpenTimer = Stopwatch.createUnstarted(); 106 Stopwatch streamTimer = Stopwatch.createUnstarted(); 107 108 fileOpenTimer.start(); 109 FSDataInputStream in = fs.open(filename); 110 fileOpenTimer.stop(); 111 112 long totalBytes = 0; 113 streamTimer.start(); 114 while (true) { 115 int read = in.read(buf); 116 if (read < 0) { 117 break; 118 } 119 totalBytes += read; 120 } 121 streamTimer.stop(); 122 123 double throughput = (double)totalBytes / streamTimer.elapsed(TimeUnit.SECONDS); 124 125 System.out.println("HDFS streaming: "); 126 System.out.println("total time to open: " + 127 fileOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 128 System.out.println("total time to read: " + streamTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 129 System.out.println("total bytes: " + totalBytes + " bytes (" 130 + StringUtils.humanReadableInt(totalBytes) + ")"); 131 System.out.println("throghput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); 132 } 133 134 private Scan getScan() { 135 Scan scan = new Scan(); // default scan settings 136 scan.setCacheBlocks(false); 137 scan.setMaxVersions(1); 138 scan.setScanMetricsEnabled(true); 139 if (caching != null) { 140 scan.setCaching(Integer.parseInt(caching)); 141 } 142 143 return scan; 144 } 145 146 public void testScan() throws IOException { 147 Stopwatch tableOpenTimer = Stopwatch.createUnstarted(); 148 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 149 Stopwatch scanTimer = Stopwatch.createUnstarted(); 150 151 tableOpenTimer.start(); 152 Connection connection = ConnectionFactory.createConnection(getConf()); 153 Table table = connection.getTable(TableName.valueOf(tablename)); 154 tableOpenTimer.stop(); 155 156 Scan scan = getScan(); 157 scanOpenTimer.start(); 158 ResultScanner scanner = table.getScanner(scan); 159 scanOpenTimer.stop(); 160 161 long numRows = 0; 162 long numCells = 0; 163 scanTimer.start(); 164 while (true) { 165 Result result = scanner.next(); 166 if (result == null) { 167 break; 168 } 169 numRows++; 170 171 numCells += result.rawCells().length; 172 } 173 scanTimer.stop(); 174 scanner.close(); 175 table.close(); 176 connection.close(); 177 178 ScanMetrics metrics = scan.getScanMetrics(); 179 long totalBytes = metrics.countOfBytesInResults.get(); 180 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 181 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); 182 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); 183 184 System.out.println("HBase scan: "); 185 System.out.println("total time to open table: " + 186 tableOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 187 System.out.println("total time to open scanner: " + 188 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 189 System.out.println("total time to scan: " + 190 scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 191 192 System.out.println("Scan metrics:\n" + metrics.getMetricsMap()); 193 194 System.out.println("total bytes: " + totalBytes + " bytes (" 195 + StringUtils.humanReadableInt(totalBytes) + ")"); 196 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); 197 System.out.println("total rows : " + numRows); 198 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); 199 System.out.println("total cells : " + numCells); 200 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); 201 } 202 203 204 public void testSnapshotScan() throws IOException { 205 Stopwatch snapshotRestoreTimer = Stopwatch.createUnstarted(); 206 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 207 Stopwatch scanTimer = Stopwatch.createUnstarted(); 208 209 Path restoreDir = new Path(this.restoreDir); 210 211 snapshotRestoreTimer.start(); 212 restoreDir.getFileSystem(conf).delete(restoreDir, true); 213 snapshotRestoreTimer.stop(); 214 215 Scan scan = getScan(); 216 scanOpenTimer.start(); 217 TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); 218 scanOpenTimer.stop(); 219 220 long numRows = 0; 221 long numCells = 0; 222 scanTimer.start(); 223 while (true) { 224 Result result = scanner.next(); 225 if (result == null) { 226 break; 227 } 228 numRows++; 229 230 numCells += result.rawCells().length; 231 } 232 scanTimer.stop(); 233 scanner.close(); 234 235 ScanMetrics metrics = scanner.getScanMetrics(); 236 long totalBytes = metrics.countOfBytesInResults.get(); 237 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 238 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); 239 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); 240 241 System.out.println("HBase scan snapshot: "); 242 System.out.println("total time to restore snapshot: " + 243 snapshotRestoreTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 244 System.out.println("total time to open scanner: " + 245 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 246 System.out.println("total time to scan: " + 247 scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 248 249 System.out.println("Scan metrics:\n" + metrics.getMetricsMap()); 250 251 System.out.println("total bytes: " + totalBytes + " bytes (" 252 + StringUtils.humanReadableInt(totalBytes) + ")"); 253 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); 254 System.out.println("total rows : " + numRows); 255 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); 256 System.out.println("total cells : " + numCells); 257 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); 258 259 } 260 261 public static enum ScanCounter { 262 NUM_ROWS, 263 NUM_CELLS, 264 } 265 266 public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> { 267 @Override 268 protected void map(ImmutableBytesWritable key, Result value, 269 Context context) throws IOException, 270 InterruptedException { 271 context.getCounter(ScanCounter.NUM_ROWS).increment(1); 272 context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length); 273 } 274 } 275 276 public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException { 277 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 278 Stopwatch scanTimer = Stopwatch.createUnstarted(); 279 280 Scan scan = getScan(); 281 282 String jobName = "testScanMapReduce"; 283 284 Job job = new Job(conf); 285 job.setJobName(jobName); 286 287 job.setJarByClass(getClass()); 288 289 TableMapReduceUtil.initTableMapperJob( 290 this.tablename, 291 scan, 292 MyMapper.class, 293 NullWritable.class, 294 NullWritable.class, 295 job 296 ); 297 298 job.setNumReduceTasks(0); 299 job.setOutputKeyClass(NullWritable.class); 300 job.setOutputValueClass(NullWritable.class); 301 job.setOutputFormatClass(NullOutputFormat.class); 302 303 scanTimer.start(); 304 job.waitForCompletion(true); 305 scanTimer.stop(); 306 307 Counters counters = job.getCounters(); 308 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue(); 309 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue(); 310 311 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue(); 312 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 313 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); 314 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); 315 316 System.out.println("HBase scan mapreduce: "); 317 System.out.println("total time to open scanner: " + 318 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 319 System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 320 321 System.out.println("total bytes: " + totalBytes + " bytes (" 322 + StringUtils.humanReadableInt(totalBytes) + ")"); 323 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); 324 System.out.println("total rows : " + numRows); 325 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); 326 System.out.println("total cells : " + numCells); 327 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); 328 } 329 330 public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException { 331 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 332 Stopwatch scanTimer = Stopwatch.createUnstarted(); 333 334 Scan scan = getScan(); 335 336 String jobName = "testSnapshotScanMapReduce"; 337 338 Job job = new Job(conf); 339 job.setJobName(jobName); 340 341 job.setJarByClass(getClass()); 342 343 TableMapReduceUtil.initTableSnapshotMapperJob( 344 this.snapshotName, 345 scan, 346 MyMapper.class, 347 NullWritable.class, 348 NullWritable.class, 349 job, 350 true, 351 new Path(restoreDir) 352 ); 353 354 job.setNumReduceTasks(0); 355 job.setOutputKeyClass(NullWritable.class); 356 job.setOutputValueClass(NullWritable.class); 357 job.setOutputFormatClass(NullOutputFormat.class); 358 359 scanTimer.start(); 360 job.waitForCompletion(true); 361 scanTimer.stop(); 362 363 Counters counters = job.getCounters(); 364 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue(); 365 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue(); 366 367 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue(); 368 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 369 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); 370 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); 371 372 System.out.println("HBase scan mapreduce: "); 373 System.out.println("total time to open scanner: " + 374 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 375 System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 376 377 System.out.println("total bytes: " + totalBytes + " bytes (" 378 + StringUtils.humanReadableInt(totalBytes) + ")"); 379 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); 380 System.out.println("total rows : " + numRows); 381 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); 382 System.out.println("total cells : " + numCells); 383 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); 384 } 385 386 @Override 387 protected int doWork() throws Exception { 388 if (type.equals("streaming")) { 389 testHdfsStreaming(new Path(file)); 390 } else if (type.equals("scan")){ 391 testScan(); 392 } else if (type.equals("snapshotscan")) { 393 testSnapshotScan(); 394 } else if (type.equals("scanmapreduce")) { 395 testScanMapReduce(); 396 } else if (type.equals("snapshotscanmapreduce")) { 397 testSnapshotScanMapReduce(); 398 } 399 return 0; 400 } 401 402 public static void main (String[] args) throws Exception { 403 int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args); 404 System.exit(ret); 405 } 406}