001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.util.concurrent.TimeUnit; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FSDataInputStream; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.client.ConnectionFactory; 029import org.apache.hadoop.hbase.client.Result; 030import org.apache.hadoop.hbase.client.ResultScanner; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.client.Table; 033import org.apache.hadoop.hbase.client.TableSnapshotScanner; 034import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 035import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 036import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 037import org.apache.hadoop.hbase.mapreduce.TableMapper; 038import org.apache.hadoop.hbase.util.AbstractHBaseTool; 039import org.apache.hadoop.hbase.util.CommonFSUtils; 040import org.apache.hadoop.io.NullWritable; 041import org.apache.hadoop.mapreduce.Counters; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 044import org.apache.hadoop.util.StringUtils; 045import org.apache.hadoop.util.ToolRunner; 046import org.apache.yetus.audience.InterfaceAudience; 047 048import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch; 049import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 050 051/** 052 * A simple performance evaluation tool for single client and MR scans 053 * and snapshot scans. 054 */ 055@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 056public class ScanPerformanceEvaluation extends AbstractHBaseTool { 057 058 private static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters"; 059 060 private String type; 061 private String file; 062 private String tablename; 063 private String snapshotName; 064 private String restoreDir; 065 private String caching; 066 067 @Override 068 public void setConf(Configuration conf) { 069 super.setConf(conf); 070 Path rootDir; 071 try { 072 rootDir = CommonFSUtils.getRootDir(conf); 073 rootDir.getFileSystem(conf); 074 } catch (IOException ex) { 075 throw new RuntimeException(ex); 076 } 077 } 078 079 @Override 080 protected void addOptions() { 081 this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce"); 082 this.addOptWithArg("f", "file", "the filename to read from"); 083 this.addOptWithArg("tn", "table", "the tablename to read from"); 084 this.addOptWithArg("sn", "snapshot", "the snapshot name to read from"); 085 this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot"); 086 this.addOptWithArg("ch", "caching", "scanner caching value"); 087 } 088 089 @Override 090 protected void processOptions(CommandLine cmd) { 091 type = cmd.getOptionValue("type"); 092 file = cmd.getOptionValue("file"); 093 tablename = cmd.getOptionValue("table"); 094 snapshotName = cmd.getOptionValue("snapshot"); 095 restoreDir = cmd.getOptionValue("restoredir"); 096 caching = cmd.getOptionValue("caching"); 097 } 098 099 protected void testHdfsStreaming(Path filename) throws IOException { 100 byte[] buf = new byte[1024]; 101 FileSystem fs = filename.getFileSystem(getConf()); 102 103 // read the file from start to finish 104 Stopwatch fileOpenTimer = Stopwatch.createUnstarted(); 105 Stopwatch streamTimer = Stopwatch.createUnstarted(); 106 107 fileOpenTimer.start(); 108 FSDataInputStream in = fs.open(filename); 109 fileOpenTimer.stop(); 110 111 long totalBytes = 0; 112 streamTimer.start(); 113 while (true) { 114 int read = in.read(buf); 115 if (read < 0) { 116 break; 117 } 118 totalBytes += read; 119 } 120 streamTimer.stop(); 121 122 double throughput = (double)totalBytes / streamTimer.elapsed(TimeUnit.SECONDS); 123 124 System.out.println("HDFS streaming: "); 125 System.out.println("total time to open: " + 126 fileOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 127 System.out.println("total time to read: " + streamTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 128 System.out.println("total bytes: " + totalBytes + " bytes (" 129 + StringUtils.humanReadableInt(totalBytes) + ")"); 130 System.out.println("throghput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); 131 } 132 133 private Scan getScan() { 134 Scan scan = new Scan(); // default scan settings 135 scan.setCacheBlocks(false); 136 scan.setMaxVersions(1); 137 scan.setScanMetricsEnabled(true); 138 if (caching != null) { 139 scan.setCaching(Integer.parseInt(caching)); 140 } 141 142 return scan; 143 } 144 145 public void testScan() throws IOException { 146 Stopwatch tableOpenTimer = Stopwatch.createUnstarted(); 147 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 148 Stopwatch scanTimer = Stopwatch.createUnstarted(); 149 150 tableOpenTimer.start(); 151 Connection connection = ConnectionFactory.createConnection(getConf()); 152 Table table = connection.getTable(TableName.valueOf(tablename)); 153 tableOpenTimer.stop(); 154 155 Scan scan = getScan(); 156 scanOpenTimer.start(); 157 ResultScanner scanner = table.getScanner(scan); 158 scanOpenTimer.stop(); 159 160 long numRows = 0; 161 long numCells = 0; 162 scanTimer.start(); 163 while (true) { 164 Result result = scanner.next(); 165 if (result == null) { 166 break; 167 } 168 numRows++; 169 170 numCells += result.rawCells().length; 171 } 172 scanTimer.stop(); 173 scanner.close(); 174 table.close(); 175 connection.close(); 176 177 ScanMetrics metrics = scan.getScanMetrics(); 178 long totalBytes = metrics.countOfBytesInResults.get(); 179 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 180 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); 181 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); 182 183 System.out.println("HBase scan: "); 184 System.out.println("total time to open table: " + 185 tableOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 186 System.out.println("total time to open scanner: " + 187 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 188 System.out.println("total time to scan: " + 189 scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 190 191 System.out.println("Scan metrics:\n" + metrics.getMetricsMap()); 192 193 System.out.println("total bytes: " + totalBytes + " bytes (" 194 + StringUtils.humanReadableInt(totalBytes) + ")"); 195 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); 196 System.out.println("total rows : " + numRows); 197 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); 198 System.out.println("total cells : " + numCells); 199 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); 200 } 201 202 203 public void testSnapshotScan() throws IOException { 204 Stopwatch snapshotRestoreTimer = Stopwatch.createUnstarted(); 205 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 206 Stopwatch scanTimer = Stopwatch.createUnstarted(); 207 208 Path restoreDir = new Path(this.restoreDir); 209 210 snapshotRestoreTimer.start(); 211 restoreDir.getFileSystem(conf).delete(restoreDir, true); 212 snapshotRestoreTimer.stop(); 213 214 Scan scan = getScan(); 215 scanOpenTimer.start(); 216 TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); 217 scanOpenTimer.stop(); 218 219 long numRows = 0; 220 long numCells = 0; 221 scanTimer.start(); 222 while (true) { 223 Result result = scanner.next(); 224 if (result == null) { 225 break; 226 } 227 numRows++; 228 229 numCells += result.rawCells().length; 230 } 231 scanTimer.stop(); 232 scanner.close(); 233 234 ScanMetrics metrics = scanner.getScanMetrics(); 235 long totalBytes = metrics.countOfBytesInResults.get(); 236 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 237 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); 238 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); 239 240 System.out.println("HBase scan snapshot: "); 241 System.out.println("total time to restore snapshot: " + 242 snapshotRestoreTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 243 System.out.println("total time to open scanner: " + 244 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 245 System.out.println("total time to scan: " + 246 scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 247 248 System.out.println("Scan metrics:\n" + metrics.getMetricsMap()); 249 250 System.out.println("total bytes: " + totalBytes + " bytes (" 251 + StringUtils.humanReadableInt(totalBytes) + ")"); 252 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); 253 System.out.println("total rows : " + numRows); 254 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); 255 System.out.println("total cells : " + numCells); 256 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); 257 258 } 259 260 public static enum ScanCounter { 261 NUM_ROWS, 262 NUM_CELLS, 263 } 264 265 public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> { 266 @Override 267 protected void map(ImmutableBytesWritable key, Result value, 268 Context context) throws IOException, 269 InterruptedException { 270 context.getCounter(ScanCounter.NUM_ROWS).increment(1); 271 context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length); 272 } 273 } 274 275 public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException { 276 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 277 Stopwatch scanTimer = Stopwatch.createUnstarted(); 278 279 Scan scan = getScan(); 280 281 String jobName = "testScanMapReduce"; 282 283 Job job = new Job(conf); 284 job.setJobName(jobName); 285 286 job.setJarByClass(getClass()); 287 288 TableMapReduceUtil.initTableMapperJob( 289 this.tablename, 290 scan, 291 MyMapper.class, 292 NullWritable.class, 293 NullWritable.class, 294 job 295 ); 296 297 job.setNumReduceTasks(0); 298 job.setOutputKeyClass(NullWritable.class); 299 job.setOutputValueClass(NullWritable.class); 300 job.setOutputFormatClass(NullOutputFormat.class); 301 302 scanTimer.start(); 303 job.waitForCompletion(true); 304 scanTimer.stop(); 305 306 Counters counters = job.getCounters(); 307 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue(); 308 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue(); 309 310 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue(); 311 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 312 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); 313 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); 314 315 System.out.println("HBase scan mapreduce: "); 316 System.out.println("total time to open scanner: " + 317 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 318 System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 319 320 System.out.println("total bytes: " + totalBytes + " bytes (" 321 + StringUtils.humanReadableInt(totalBytes) + ")"); 322 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); 323 System.out.println("total rows : " + numRows); 324 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); 325 System.out.println("total cells : " + numCells); 326 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); 327 } 328 329 public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException { 330 Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); 331 Stopwatch scanTimer = Stopwatch.createUnstarted(); 332 333 Scan scan = getScan(); 334 335 String jobName = "testSnapshotScanMapReduce"; 336 337 Job job = new Job(conf); 338 job.setJobName(jobName); 339 340 job.setJarByClass(getClass()); 341 342 TableMapReduceUtil.initTableSnapshotMapperJob( 343 this.snapshotName, 344 scan, 345 MyMapper.class, 346 NullWritable.class, 347 NullWritable.class, 348 job, 349 true, 350 new Path(restoreDir) 351 ); 352 353 job.setNumReduceTasks(0); 354 job.setOutputKeyClass(NullWritable.class); 355 job.setOutputValueClass(NullWritable.class); 356 job.setOutputFormatClass(NullOutputFormat.class); 357 358 scanTimer.start(); 359 job.waitForCompletion(true); 360 scanTimer.stop(); 361 362 Counters counters = job.getCounters(); 363 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue(); 364 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue(); 365 366 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue(); 367 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); 368 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); 369 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); 370 371 System.out.println("HBase scan mapreduce: "); 372 System.out.println("total time to open scanner: " + 373 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 374 System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); 375 376 System.out.println("total bytes: " + totalBytes + " bytes (" 377 + StringUtils.humanReadableInt(totalBytes) + ")"); 378 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); 379 System.out.println("total rows : " + numRows); 380 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); 381 System.out.println("total cells : " + numCells); 382 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); 383 } 384 385 @Override 386 protected int doWork() throws Exception { 387 if (type.equals("streaming")) { 388 testHdfsStreaming(new Path(file)); 389 } else if (type.equals("scan")){ 390 testScan(); 391 } else if (type.equals("snapshotscan")) { 392 testSnapshotScan(); 393 } else if (type.equals("scanmapreduce")) { 394 testScanMapReduce(); 395 } else if (type.equals("snapshotscanmapreduce")) { 396 testSnapshotScanMapReduce(); 397 } 398 return 0; 399 } 400 401 public static void main (String[] args) throws Exception { 402 int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args); 403 System.exit(ret); 404 } 405}