001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.ArrayList; 024 025import org.apache.commons.lang3.StringUtils; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.conf.Configured; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.filter.FilterBase; 036import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 037import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.mapreduce.Counter; 041import org.apache.hadoop.mapreduce.Job; 042import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 043import org.apache.hadoop.util.Tool; 044import org.apache.hadoop.util.ToolRunner; 045 046/** 047 * A job with a just a map phase to count rows. Map outputs table rows IF the 048 * input row has columns that have content. 049 */ 050@InterfaceAudience.Public 051public class RowCounter extends Configured implements Tool { 052 053 private static final Logger LOG = LoggerFactory.getLogger(RowCounter.class); 054 055 /** Name of this 'program'. */ 056 static final String NAME = "rowcounter"; 057 058 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 059 private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count"; 060 061 /** 062 * Mapper that runs the count. 063 */ 064 static class RowCounterMapper 065 extends TableMapper<ImmutableBytesWritable, Result> { 066 067 /** Counter enumeration to count the actual rows. */ 068 public static enum Counters {ROWS} 069 070 /** 071 * Maps the data. 072 * 073 * @param row The current table row key. 074 * @param values The columns. 075 * @param context The current context. 076 * @throws IOException When something is broken with the data. 077 * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context) 078 */ 079 @Override 080 public void map(ImmutableBytesWritable row, Result values, 081 Context context) 082 throws IOException { 083 // Count every row containing data, whether it's in qualifiers or values 084 context.getCounter(Counters.ROWS).increment(1); 085 } 086 } 087 088 /** 089 * Sets up the actual job. 090 * 091 * @param conf The current configuration. 092 * @param args The command line parameters. 093 * @return The newly created job. 094 * @throws IOException When setting up the job fails. 095 */ 096 public static Job createSubmittableJob(Configuration conf, String[] args) 097 throws IOException { 098 String tableName = args[0]; 099 List<MultiRowRangeFilter.RowRange> rowRangeList = null; 100 long startTime = 0; 101 long endTime = 0; 102 103 StringBuilder sb = new StringBuilder(); 104 105 final String rangeSwitch = "--range="; 106 final String startTimeArgKey = "--starttime="; 107 final String endTimeArgKey = "--endtime="; 108 final String expectedCountArg = "--expected-count="; 109 110 // First argument is table name, starting from second 111 for (int i = 1; i < args.length; i++) { 112 if (args[i].startsWith(rangeSwitch)) { 113 try { 114 rowRangeList = parseRowRangeParameter(args[i], rangeSwitch); 115 } catch (IllegalArgumentException e) { 116 return null; 117 } 118 continue; 119 } 120 if (args[i].startsWith(startTimeArgKey)) { 121 startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); 122 continue; 123 } 124 if (args[i].startsWith(endTimeArgKey)) { 125 endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); 126 continue; 127 } 128 if (args[i].startsWith(expectedCountArg)) { 129 conf.setLong(EXPECTED_COUNT_KEY, 130 Long.parseLong(args[i].substring(expectedCountArg.length()))); 131 continue; 132 } 133 // if no switch, assume column names 134 sb.append(args[i]); 135 sb.append(" "); 136 } 137 if (endTime < startTime) { 138 printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); 139 return null; 140 } 141 142 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 143 job.setJarByClass(RowCounter.class); 144 Scan scan = new Scan(); 145 scan.setCacheBlocks(false); 146 setScanFilter(scan, rowRangeList); 147 if (sb.length() > 0) { 148 for (String columnName : sb.toString().trim().split(" ")) { 149 String family = StringUtils.substringBefore(columnName, ":"); 150 String qualifier = StringUtils.substringAfter(columnName, ":"); 151 152 if (StringUtils.isBlank(qualifier)) { 153 scan.addFamily(Bytes.toBytes(family)); 154 } 155 else { 156 scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); 157 } 158 } 159 } 160 scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); 161 job.setOutputFormatClass(NullOutputFormat.class); 162 TableMapReduceUtil.initTableMapperJob(tableName, scan, 163 RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); 164 job.setNumReduceTasks(0); 165 return job; 166 } 167 168 private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter( 169 String arg, String rangeSwitch) { 170 final String[] ranges = arg.substring(rangeSwitch.length()).split(";"); 171 final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>(); 172 for (String range : ranges) { 173 String[] startEnd = range.split(",", 2); 174 if (startEnd.length != 2 || startEnd[1].contains(",")) { 175 printUsage("Please specify range in such format as \"--range=a,b\" " + 176 "or, with only one boundary, \"--range=,b\" or \"--range=a,\""); 177 throw new IllegalArgumentException("Wrong range specification: " + range); 178 } 179 String startKey = startEnd[0]; 180 String endKey = startEnd[1]; 181 rangeList.add(new MultiRowRangeFilter.RowRange( 182 Bytes.toBytesBinary(startKey), true, 183 Bytes.toBytesBinary(endKey), false)); 184 } 185 return rangeList; 186 } 187 188 /** 189 * Sets filter {@link FilterBase} to the {@link Scan} instance. 190 * If provided rowRangeList contains more than one element, 191 * method sets filter which is instance of {@link MultiRowRangeFilter}. 192 * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. 193 * If rowRangeList contains exactly one element, startRow and stopRow are set to the scan. 194 * @param scan 195 * @param rowRangeList 196 */ 197 private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) { 198 final int size = rowRangeList == null ? 0 : rowRangeList.size(); 199 if (size <= 1) { 200 scan.setFilter(new FirstKeyOnlyFilter()); 201 } 202 if (size == 1) { 203 MultiRowRangeFilter.RowRange range = rowRangeList.get(0); 204 scan.setStartRow(range.getStartRow()); //inclusive 205 scan.setStopRow(range.getStopRow()); //exclusive 206 } else if (size > 1) { 207 scan.setFilter(new MultiRowRangeFilter(rowRangeList)); 208 } 209 } 210 211 /* 212 * @param errorMessage Can attach a message when error occurs. 213 */ 214 private static void printUsage(String errorMessage) { 215 System.err.println("ERROR: " + errorMessage); 216 printUsage(); 217 } 218 219 /** 220 * Prints usage without error message. 221 * Note that we don't document --expected-count, because it's intended for test. 222 */ 223 private static void printUsage() { 224 System.err.println("Usage: hbase rowcounter [options] <tablename> " 225 + "[--starttime=<start> --endtime=<end>] " 226 + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]"); 227 System.err.println("For performance consider the following options:\n" 228 + "-Dhbase.client.scanner.caching=100\n" 229 + "-Dmapreduce.map.speculative=false"); 230 } 231 232 @Override 233 public int run(String[] args) throws Exception { 234 if (args.length < 1) { 235 printUsage("Wrong number of parameters: " + args.length); 236 return -1; 237 } 238 Job job = createSubmittableJob(getConf(), args); 239 if (job == null) { 240 return -1; 241 } 242 boolean success = job.waitForCompletion(true); 243 final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1); 244 if (success && expectedCount != -1) { 245 final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS); 246 success = expectedCount == counter.getValue(); 247 if (!success) { 248 LOG.error("Failing job because count of '" + counter.getValue() + 249 "' does not match expected count of '" + expectedCount + "'"); 250 } 251 } 252 return (success ? 0 : 1); 253 } 254 255 /** 256 * Main entry point. 257 * @param args The command line parameters. 258 * @throws Exception When running the job fails. 259 */ 260 public static void main(String[] args) throws Exception { 261 int errCode = ToolRunner.run(HBaseConfiguration.create(), new RowCounter(), args); 262 System.exit(errCode); 263 } 264 265}