1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.util.Set;
23 import java.util.TreeSet;
24
25 import org.apache.commons.lang.StringUtils;
26 import org.apache.hadoop.hbase.HConstants;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.classification.InterfaceStability;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HBaseConfiguration;
31 import org.apache.hadoop.hbase.client.Result;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
34 import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.mapreduce.Job;
38 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
39 import org.apache.hadoop.util.GenericOptionsParser;
40
41
42
43
44
45 @InterfaceAudience.Public
46 @InterfaceStability.Stable
47 public class RowCounter {
48
49
50 static final String NAME = "rowcounter";
51
52
53
54
55 static class RowCounterMapper
56 extends TableMapper<ImmutableBytesWritable, Result> {
57
58
59 public static enum Counters {ROWS}
60
61
62
63
64
65
66
67
68
69
70
71 @Override
72 public void map(ImmutableBytesWritable row, Result values,
73 Context context)
74 throws IOException {
75
76 context.getCounter(Counters.ROWS).increment(1);
77 }
78 }
79
80
81
82
83
84
85
86
87
88 public static Job createSubmittableJob(Configuration conf, String[] args)
89 throws IOException {
90 String tableName = args[0];
91 String startKey = null;
92 String endKey = null;
93 long startTime = 0;
94 long endTime = 0;
95
96 StringBuilder sb = new StringBuilder();
97
98 final String rangeSwitch = "--range=";
99 final String startTimeArgKey = "--starttime=";
100 final String endTimeArgKey = "--endtime=";
101
102
103 for (int i = 1; i < args.length; i++) {
104 if (args[i].startsWith(rangeSwitch)) {
105 String[] startEnd = args[i].substring(rangeSwitch.length()).split(",", 2);
106 if (startEnd.length != 2 || startEnd[1].contains(",")) {
107 printUsage("Please specify range in such format as \"--range=a,b\" " +
108 "or, with only one boundary, \"--range=,b\" or \"--range=a,\"");
109 return null;
110 }
111 startKey = startEnd[0];
112 endKey = startEnd[1];
113 }
114 if (startTime < endTime) {
115 printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
116 return null;
117 }
118 if (args[i].startsWith(startTimeArgKey)) {
119 startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
120 continue;
121 }
122 if (args[i].startsWith(endTimeArgKey)) {
123 endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
124 continue;
125 }
126 else {
127
128 sb.append(args[i]);
129 sb.append(" ");
130 }
131 }
132
133 Job job = new Job(conf, NAME + "_" + tableName);
134 job.setJarByClass(RowCounter.class);
135 Scan scan = new Scan();
136 scan.setCacheBlocks(false);
137 Set<byte []> qualifiers = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
138 if (startKey != null && !startKey.equals("")) {
139 scan.setStartRow(Bytes.toBytes(startKey));
140 }
141 if (endKey != null && !endKey.equals("")) {
142 scan.setStopRow(Bytes.toBytes(endKey));
143 }
144 if (sb.length() > 0) {
145 for (String columnName : sb.toString().trim().split(" ")) {
146 String family = StringUtils.substringBefore(columnName, ":");
147 String qualifier = StringUtils.substringAfter(columnName, ":");
148
149 if (StringUtils.isBlank(qualifier)) {
150 scan.addFamily(Bytes.toBytes(family));
151 }
152 else {
153 scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
154 }
155 }
156 }
157
158
159
160 if (qualifiers.size() == 0) {
161 scan.setFilter(new FirstKeyOnlyFilter());
162 } else {
163 scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers));
164 }
165 scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
166 job.setOutputFormatClass(NullOutputFormat.class);
167 TableMapReduceUtil.initTableMapperJob(tableName, scan,
168 RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
169 job.setNumReduceTasks(0);
170 return job;
171 }
172
173
174
175
176 private static void printUsage(String errorMessage) {
177 System.err.println("ERROR: " + errorMessage);
178 printUsage();
179 }
180
181
182
183
184 private static void printUsage() {
185 System.err.println("Usage: RowCounter [options] <tablename> " +
186 "[--starttime=[start] --endtime=[end] " +
187 "[--range=[startKey],[endKey]] [<column1> <column2>...]");
188 System.err.println("For performance consider the following options:\n"
189 + "-Dhbase.client.scanner.caching=100\n"
190 + "-Dmapreduce.map.speculative=false");
191 }
192
193
194
195
196
197
198
199 public static void main(String[] args) throws Exception {
200 Configuration conf = HBaseConfiguration.create();
201 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
202 if (otherArgs.length < 1) {
203 printUsage("Wrong number of parameters: " + args.length);
204 System.exit(-1);
205 }
206 Job job = createSubmittableJob(conf, otherArgs);
207 if (job == null) {
208 System.exit(-1);
209 }
210 System.exit(job.waitForCompletion(true) ? 0 : 1);
211 }
212 }