001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.ArrayList; 024 025import org.apache.commons.lang3.StringUtils; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.util.AbstractHBaseTool; 028import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 029import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser; 030import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 031import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; 032import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 033import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException; 034import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.filter.FilterBase; 042import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 043import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; 044import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.mapreduce.Counter; 047import org.apache.hadoop.mapreduce.Job; 048import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 049 050/** 051 * A job with a just a map phase to count rows. Map outputs table rows IF the 052 * input row has columns that have content. 053 */ 054@InterfaceAudience.Public 055public class RowCounter extends AbstractHBaseTool { 056 057 private static final Logger LOG = LoggerFactory.getLogger(RowCounter.class); 058 059 /** Name of this 'program'. */ 060 static final String NAME = "rowcounter"; 061 062 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 063 private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count"; 064 065 private final static String OPT_START_TIME = "starttime"; 066 private final static String OPT_END_TIME = "endtime"; 067 private final static String OPT_RANGE = "range"; 068 private final static String OPT_EXPECTED_COUNT = "expectedCount"; 069 070 private String tableName; 071 private List<MultiRowRangeFilter.RowRange> rowRangeList; 072 private long startTime; 073 private long endTime; 074 private long expectedCount; 075 private List<String> columns = new ArrayList<>(); 076 077 /** 078 * Mapper that runs the count. 079 */ 080 static class RowCounterMapper 081 extends TableMapper<ImmutableBytesWritable, Result> { 082 083 /** Counter enumeration to count the actual rows. */ 084 public static enum Counters {ROWS} 085 086 /** 087 * Maps the data. 088 * 089 * @param row The current table row key. 090 * @param values The columns. 091 * @param context The current context. 092 * @throws IOException When something is broken with the data. 093 * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context) 094 */ 095 @Override 096 public void map(ImmutableBytesWritable row, Result values, 097 Context context) 098 throws IOException { 099 // Count every row containing data, whether it's in qualifiers or values 100 context.getCounter(Counters.ROWS).increment(1); 101 } 102 } 103 104 /** 105 * Sets up the actual job. 106 * 107 * @param conf The current configuration. 108 * @return The newly created job. 109 * @throws IOException When setting up the job fails. 110 */ 111 public Job createSubmittableJob(Configuration conf) throws IOException { 112 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 113 job.setJarByClass(RowCounter.class); 114 Scan scan = new Scan(); 115 scan.setCacheBlocks(false); 116 setScanFilter(scan, rowRangeList); 117 118 for (String columnName : this.columns) { 119 String family = StringUtils.substringBefore(columnName, ":"); 120 String qualifier = StringUtils.substringAfter(columnName, ":"); 121 if (StringUtils.isBlank(qualifier)) { 122 scan.addFamily(Bytes.toBytes(family)); 123 } else { 124 scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); 125 } 126 } 127 128 if(this.expectedCount >= 0) { 129 conf.setLong(EXPECTED_COUNT_KEY, this.expectedCount); 130 } 131 132 scan.setTimeRange(startTime, endTime); 133 job.setOutputFormatClass(NullOutputFormat.class); 134 TableMapReduceUtil.initTableMapperJob(tableName, scan, 135 RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); 136 job.setNumReduceTasks(0); 137 return job; 138 } 139 140 /** 141 * Sets up the actual job. 142 * 143 * @param conf The current configuration. 144 * @param args The command line parameters. 145 * @return The newly created job. 146 * @throws IOException When setting up the job fails. 147 * @deprecated as of release 2.3.0. Will be removed on 4.0.0. Please use main method instead. 148 */ 149 @Deprecated 150 public static Job createSubmittableJob(Configuration conf, String[] args) 151 throws IOException { 152 String tableName = args[0]; 153 List<MultiRowRangeFilter.RowRange> rowRangeList = null; 154 long startTime = 0; 155 long endTime = 0; 156 157 StringBuilder sb = new StringBuilder(); 158 159 final String rangeSwitch = "--range="; 160 final String startTimeArgKey = "--starttime="; 161 final String endTimeArgKey = "--endtime="; 162 final String expectedCountArg = "--expected-count="; 163 164 // First argument is table name, starting from second 165 for (int i = 1; i < args.length; i++) { 166 if (args[i].startsWith(rangeSwitch)) { 167 try { 168 rowRangeList = parseRowRangeParameter( 169 args[i].substring(args[1].indexOf(rangeSwitch)+rangeSwitch.length())); 170 } catch (IllegalArgumentException e) { 171 return null; 172 } 173 continue; 174 } 175 if (args[i].startsWith(startTimeArgKey)) { 176 startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); 177 continue; 178 } 179 if (args[i].startsWith(endTimeArgKey)) { 180 endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); 181 continue; 182 } 183 if (args[i].startsWith(expectedCountArg)) { 184 conf.setLong(EXPECTED_COUNT_KEY, 185 Long.parseLong(args[i].substring(expectedCountArg.length()))); 186 continue; 187 } 188 // if no switch, assume column names 189 sb.append(args[i]); 190 sb.append(" "); 191 } 192 if (endTime < startTime) { 193 printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); 194 return null; 195 } 196 197 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 198 job.setJarByClass(RowCounter.class); 199 Scan scan = new Scan(); 200 scan.setCacheBlocks(false); 201 setScanFilter(scan, rowRangeList); 202 if (sb.length() > 0) { 203 for (String columnName : sb.toString().trim().split(" ")) { 204 String family = StringUtils.substringBefore(columnName, ":"); 205 String qualifier = StringUtils.substringAfter(columnName, ":"); 206 207 if (StringUtils.isBlank(qualifier)) { 208 scan.addFamily(Bytes.toBytes(family)); 209 } 210 else { 211 scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); 212 } 213 } 214 } 215 scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); 216 job.setOutputFormatClass(NullOutputFormat.class); 217 TableMapReduceUtil.initTableMapperJob(tableName, scan, 218 RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); 219 job.setNumReduceTasks(0); 220 return job; 221 } 222 223 /** 224 * Prints usage without error message. 225 * Note that we don't document --expected-count, because it's intended for test. 226 */ 227 private static void printUsage(String errorMessage) { 228 System.err.println("ERROR: " + errorMessage); 229 System.err.println("Usage: hbase rowcounter [options] <tablename> " 230 + "[--starttime=<start> --endtime=<end>] " 231 + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]"); 232 System.err.println("For performance consider the following options:\n" 233 + "-Dhbase.client.scanner.caching=100\n" 234 + "-Dmapreduce.map.speculative=false"); 235 } 236 237 private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String arg) { 238 final List<String> rangesSplit = Splitter.on(";").splitToList(arg); 239 final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>(); 240 for (String range : rangesSplit) { 241 if(range!=null && !range.isEmpty()) { 242 List<String> startEnd = Splitter.on(",").splitToList(range); 243 if (startEnd.size() != 2 || startEnd.get(1).contains(",")) { 244 throw new IllegalArgumentException("Wrong range specification: " + range); 245 } 246 String startKey = startEnd.get(0); 247 String endKey = startEnd.get(1); 248 rangeList.add(new MultiRowRangeFilter.RowRange(Bytes.toBytesBinary(startKey), 249 true, Bytes.toBytesBinary(endKey), false)); 250 } 251 } 252 return rangeList; 253 } 254 255 /** 256 * Sets filter {@link FilterBase} to the {@link Scan} instance. 257 * If provided rowRangeList contains more than one element, 258 * method sets filter which is instance of {@link MultiRowRangeFilter}. 259 * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. 260 * If rowRangeList contains exactly one element, startRow and stopRow are set to the scan. 261 * @param scan 262 * @param rowRangeList 263 */ 264 private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) { 265 final int size = rowRangeList == null ? 0 : rowRangeList.size(); 266 if (size <= 1) { 267 scan.setFilter(new FirstKeyOnlyFilter()); 268 } 269 if (size == 1) { 270 MultiRowRangeFilter.RowRange range = rowRangeList.get(0); 271 scan.setStartRow(range.getStartRow()); //inclusive 272 scan.setStopRow(range.getStopRow()); //exclusive 273 } else if (size > 1) { 274 scan.setFilter(new MultiRowRangeFilter(rowRangeList)); 275 } 276 } 277 278 @Override 279 protected void printUsage() { 280 StringBuilder footerBuilder = new StringBuilder(); 281 footerBuilder.append("For performance, consider the following configuration properties:\n"); 282 footerBuilder.append("-Dhbase.client.scanner.caching=100\n"); 283 footerBuilder.append("-Dmapreduce.map.speculative=false\n"); 284 printUsage("hbase rowcounter <tablename> [options] [<column1> <column2>...]", 285 "Options:", footerBuilder.toString()); 286 } 287 288 @Override 289 protected void printUsage(final String usageStr, final String usageHeader, 290 final String usageFooter) { 291 HelpFormatter helpFormatter = new HelpFormatter(); 292 helpFormatter.setWidth(120); 293 helpFormatter.setOptionComparator(new AbstractHBaseTool.OptionsOrderComparator()); 294 helpFormatter.setLongOptSeparator("="); 295 helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter); 296 } 297 298 @Override 299 protected void addOptions() { 300 Option startTimeOption = Option.builder(null).valueSeparator('=').hasArg(true). 301 desc("starting time filter to start counting rows from.").longOpt(OPT_START_TIME).build(); 302 Option endTimeOption = Option.builder(null).valueSeparator('=').hasArg(true). 303 desc("end time filter limit, to only count rows up to this timestamp."). 304 longOpt(OPT_END_TIME).build(); 305 Option rangeOption = Option.builder(null).valueSeparator('=').hasArg(true). 306 desc("[startKey],[endKey][;[startKey],[endKey]...]]").longOpt(OPT_RANGE).build(); 307 Option expectedOption = Option.builder(null).valueSeparator('=').hasArg(true). 308 desc("expected number of rows to be count.").longOpt(OPT_EXPECTED_COUNT).build(); 309 addOption(startTimeOption); 310 addOption(endTimeOption); 311 addOption(rangeOption); 312 addOption(expectedOption); 313 } 314 315 @Override 316 protected void processOptions(CommandLine cmd) throws IllegalArgumentException{ 317 this.tableName = cmd.getArgList().get(0); 318 if(cmd.getOptionValue(OPT_RANGE)!=null) { 319 this.rowRangeList = parseRowRangeParameter(cmd.getOptionValue(OPT_RANGE)); 320 } 321 this.endTime = cmd.getOptionValue(OPT_END_TIME) == null ? HConstants.LATEST_TIMESTAMP : 322 Long.parseLong(cmd.getOptionValue(OPT_END_TIME)); 323 this.expectedCount = cmd.getOptionValue(OPT_EXPECTED_COUNT) == null ? Long.MIN_VALUE : 324 Long.parseLong(cmd.getOptionValue(OPT_EXPECTED_COUNT)); 325 this.startTime = cmd.getOptionValue(OPT_START_TIME) == null ? 0 : 326 Long.parseLong(cmd.getOptionValue(OPT_START_TIME)); 327 328 for(int i=1; i<cmd.getArgList().size(); i++){ 329 String argument = cmd.getArgList().get(i); 330 if(!argument.startsWith("-")){ 331 this.columns.add(argument); 332 } 333 } 334 335 if (endTime < startTime) { 336 throw new IllegalArgumentException("--endtime=" + endTime + 337 " needs to be greater than --starttime=" + startTime); 338 } 339 } 340 341 @Override 342 protected void processOldArgs(List<String> args) { 343 List<String> copiedArgs = new ArrayList<>(args); 344 args.removeAll(copiedArgs); 345 for(String arg : copiedArgs){ 346 if(arg.startsWith("-") && arg.contains("=")){ 347 String[] kv = arg.split("="); 348 args.add(kv[0]); 349 args.add(kv[1]); 350 } else { 351 args.add(arg); 352 } 353 } 354 } 355 356 @Override 357 protected int doWork() throws Exception { 358 Job job = createSubmittableJob(getConf()); 359 if (job == null) { 360 return -1; 361 } 362 boolean success = job.waitForCompletion(true); 363 final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1); 364 if (success && expectedCount != -1) { 365 final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS); 366 success = expectedCount == counter.getValue(); 367 if (!success) { 368 LOG.error("Failing job because count of '" + counter.getValue() + 369 "' does not match expected count of '" + expectedCount + "'"); 370 } 371 } 372 return (success ? 0 : 1); 373 } 374 375 /** 376 * Main entry point. 377 * @param args The command line parameters. 378 * @throws Exception When running the job fails. 379 */ 380 public static void main(String[] args) throws Exception { 381 new RowCounter().doStaticMain(args); 382 } 383 384 static class RowCounterCommandLineParser extends BasicParser { 385 386 @Override 387 protected void checkRequiredOptions() throws MissingOptionException { 388 if(this.cmd.getArgList().size()<1 || this.cmd.getArgList().get(0).startsWith("-")){ 389 throw new MissingOptionException("First argument must be a valid table name."); 390 } 391 } 392 } 393 394 @Override 395 protected CommandLineParser newParser() { 396 return new RowCounterCommandLineParser(); 397 } 398 399}