1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.Scan;
35 import org.apache.hadoop.hbase.filter.CompareFilter;
36 import org.apache.hadoop.hbase.filter.Filter;
37 import org.apache.hadoop.hbase.filter.PrefixFilter;
38 import org.apache.hadoop.hbase.filter.RegexStringComparator;
39 import org.apache.hadoop.hbase.filter.RowFilter;
40 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.io.IntWritable;
43 import org.apache.hadoop.io.Text;
44 import org.apache.hadoop.mapreduce.Job;
45 import org.apache.hadoop.mapreduce.Reducer;
46 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
47 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
48 import org.apache.hadoop.util.GenericOptionsParser;
49
50 import com.google.common.base.Preconditions;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 @InterfaceAudience.Public
71 @InterfaceStability.Stable
72 public class CellCounter {
73 private static final Log LOG =
74 LogFactory.getLog(CellCounter.class.getName());
75
76
77
78
79
80 static final String NAME = "CellCounter";
81
82
83
84
85 static class CellCounterMapper
86 extends TableMapper<Text, IntWritable> {
87
88
89
90 public static enum Counters {
91 ROWS
92 }
93
94
95
96
97
98
99
100
101
102
103
104
105 @Override
106 public void map(ImmutableBytesWritable row, Result values,
107 Context context)
108 throws IOException {
109 Preconditions.checkState(values != null,
110 "values passed to the map is null");
111 String currentFamilyName = null;
112 String currentQualifierName = null;
113 String currentRowKey = null;
114 Configuration config = context.getConfiguration();
115 String separator = config.get("ReportSeparator",":");
116 try {
117 context.getCounter(Counters.ROWS).increment(1);
118 context.write(new Text("Total ROWS"), new IntWritable(1));
119
120 for (Cell value : values.listCells()) {
121 currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value));
122 String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value));
123 if (!thisRowFamilyName.equals(currentFamilyName)) {
124 currentFamilyName = thisRowFamilyName;
125 context.getCounter("CF", thisRowFamilyName).increment(1);
126 if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) {
127 context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
128 context.write(new Text(thisRowFamilyName), new IntWritable(1));
129 }
130 }
131 String thisRowQualifierName = thisRowFamilyName + separator
132 + Bytes.toStringBinary(CellUtil.cloneQualifier(value));
133 if (!thisRowQualifierName.equals(currentQualifierName)) {
134 currentQualifierName = thisRowQualifierName;
135 context.getCounter("CFQL", thisRowQualifierName).increment(1);
136 context.write(new Text("Total Qualifiers across all Rows"),
137 new IntWritable(1));
138 context.write(new Text(thisRowQualifierName), new IntWritable(1));
139
140 context.getCounter("QL_VERSIONS", currentRowKey + separator +
141 thisRowQualifierName).increment(1);
142 context.write(new Text(currentRowKey + separator
143 + thisRowQualifierName + "_Versions"), new IntWritable(1));
144
145 } else {
146
147 currentQualifierName = thisRowQualifierName;
148 context.getCounter("QL_VERSIONS", currentRowKey + separator +
149 thisRowQualifierName).increment(1);
150 context.write(new Text(currentRowKey + separator
151 + thisRowQualifierName + "_Versions"), new IntWritable(1));
152 }
153 }
154 } catch (InterruptedException e) {
155 e.printStackTrace();
156 }
157 }
158 }
159
160 static class IntSumReducer<Key> extends Reducer<Key, IntWritable,
161 Key, IntWritable> {
162
163 private IntWritable result = new IntWritable();
164 public void reduce(Key key, Iterable<IntWritable> values,
165 Context context)
166 throws IOException, InterruptedException {
167 int sum = 0;
168 for (IntWritable val : values) {
169 sum += val.get();
170 }
171 result.set(sum);
172 context.write(key, result);
173 }
174 }
175
176
177
178
179
180
181
182
183
184 public static Job createSubmittableJob(Configuration conf, String[] args)
185 throws IOException {
186 String tableName = args[0];
187 Path outputDir = new Path(args[1]);
188 String reportSeparatorString = (args.length > 2) ? args[2]: ":";
189 conf.set("ReportSeparator", reportSeparatorString);
190 Job job = new Job(conf, NAME + "_" + tableName);
191 job.setJarByClass(CellCounter.class);
192 Scan scan = getConfiguredScanForJob(conf, args);
193 TableMapReduceUtil.initTableMapperJob(tableName, scan,
194 CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
195 job.setNumReduceTasks(1);
196 job.setMapOutputKeyClass(Text.class);
197 job.setMapOutputValueClass(IntWritable.class);
198 job.setOutputFormatClass(TextOutputFormat.class);
199 job.setOutputKeyClass(Text.class);
200 job.setOutputValueClass(IntWritable.class);
201 FileOutputFormat.setOutputPath(job, outputDir);
202 job.setReducerClass(IntSumReducer.class);
203 return job;
204 }
205
206 private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
207 Scan s = new Scan();
208
209 s.setMaxVersions(Integer.MAX_VALUE);
210 s.setCacheBlocks(false);
211
212 if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
213 s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
214 }
215
216 Filter rowFilter = getRowFilter(args);
217 if (rowFilter!= null) {
218 LOG.info("Setting Row Filter for counter.");
219 s.setFilter(rowFilter);
220 }
221
222 long timeRange[] = getTimeRange(args);
223 if (timeRange != null) {
224 LOG.info("Setting TimeRange for counter.");
225 s.setTimeRange(timeRange[0], timeRange[1]);
226 }
227 return s;
228 }
229
230
231 private static Filter getRowFilter(String[] args) {
232 Filter rowFilter = null;
233 String filterCriteria = (args.length > 3) ? args[3]: null;
234 if (filterCriteria == null) return null;
235 if (filterCriteria.startsWith("^")) {
236 String regexPattern = filterCriteria.substring(1, filterCriteria.length());
237 rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern));
238 } else {
239 rowFilter = new PrefixFilter(Bytes.toBytes(filterCriteria));
240 }
241 return rowFilter;
242 }
243
244 private static long[] getTimeRange(String[] args) throws IOException {
245 final String startTimeArgKey = "--starttime=";
246 final String endTimeArgKey = "--endtime=";
247 long startTime = 0L;
248 long endTime = 0L;
249
250 for (int i = 1; i < args.length; i++) {
251 System.out.println("i:" + i + "arg[i]" + args[i]);
252 if (args[i].startsWith(startTimeArgKey)) {
253 startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
254 }
255 if (args[i].startsWith(endTimeArgKey)) {
256 endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
257 }
258 }
259
260 if (startTime == 0 && endTime == 0)
261 return null;
262
263 endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime;
264 return new long [] {startTime, endTime};
265 }
266
267
268
269
270
271
272 public static void main(String[] args) throws Exception {
273 Configuration conf = HBaseConfiguration.create();
274 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
275 if (otherArgs.length < 2) {
276 System.err.println("ERROR: Wrong number of parameters: " + args.length);
277 System.err.println("Usage: CellCounter ");
278 System.err.println(" <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " +
279 "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]");
280 System.err.println(" Note: -D properties will be applied to the conf used. ");
281 System.err.println(" Additionally, the following SCAN properties can be specified");
282 System.err.println(" to get fine grained control on what is counted..");
283 System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
284 System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
285 "string : used to separate the rowId/column family name and qualifier name.");
286 System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +
287 "operation to a limited subset of rows from the table based on regex or prefix pattern.");
288 System.exit(-1);
289 }
290 Job job = createSubmittableJob(conf, otherArgs);
291 System.exit(job.waitForCompletion(true) ? 0 : 1);
292 }
293 }