1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.classification.InterfaceStability;
26 import org.apache.hadoop.hbase.Cell;
27 import org.apache.hadoop.hbase.CellUtil;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.hadoop.mapred.JobConf;
33 import org.apache.hadoop.mapred.MapReduceBase;
34 import org.apache.hadoop.mapred.OutputCollector;
35 import org.apache.hadoop.mapred.Reporter;
36
37
38
39
40
41 @InterfaceAudience.Public
42 @InterfaceStability.Stable
43 public class GroupingTableMap
44 extends MapReduceBase
45 implements TableMap<ImmutableBytesWritable,Result> {
46
47
48
49
50
51 public static final String GROUP_COLUMNS =
52 "hbase.mapred.groupingtablemap.columns";
53
54 protected byte [][] columns;
55
56
57
58
59
60
61
62
63
64
65
66
67 @SuppressWarnings("unchecked")
68 public static void initJob(String table, String columns, String groupColumns,
69 Class<? extends TableMap> mapper, JobConf job) {
70
71 TableMapReduceUtil.initTableMapJob(table, columns, mapper,
72 ImmutableBytesWritable.class, Result.class, job);
73 job.set(GROUP_COLUMNS, groupColumns);
74 }
75
76 @Override
77 public void configure(JobConf job) {
78 super.configure(job);
79 String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
80 columns = new byte[cols.length][];
81 for(int i = 0; i < cols.length; i++) {
82 columns[i] = Bytes.toBytes(cols[i]);
83 }
84 }
85
86
87
88
89
90
91
92
93
94
95
96
97 public void map(ImmutableBytesWritable key, Result value,
98 OutputCollector<ImmutableBytesWritable,Result> output,
99 Reporter reporter) throws IOException {
100
101 byte[][] keyVals = extractKeyValues(value);
102 if(keyVals != null) {
103 ImmutableBytesWritable tKey = createGroupKey(keyVals);
104 output.collect(tKey, value);
105 }
106 }
107
108
109
110
111
112
113
114
115
116
117 protected byte[][] extractKeyValues(Result r) {
118 byte[][] keyVals = null;
119 ArrayList<byte[]> foundList = new ArrayList<byte[]>();
120 int numCols = columns.length;
121 if (numCols > 0) {
122 for (Cell value: r.listCells()) {
123 byte [] column = KeyValue.makeColumn(CellUtil.cloneFamily(value),
124 CellUtil.cloneQualifier(value));
125 for (int i = 0; i < numCols; i++) {
126 if (Bytes.equals(column, columns[i])) {
127 foundList.add(CellUtil.cloneValue(value));
128 break;
129 }
130 }
131 }
132 if(foundList.size() == numCols) {
133 keyVals = foundList.toArray(new byte[numCols][]);
134 }
135 }
136 return keyVals;
137 }
138
139
140
141
142
143
144
145
146 protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
147 if(vals == null) {
148 return null;
149 }
150 StringBuilder sb = new StringBuilder();
151 for(int i = 0; i < vals.length; i++) {
152 if(i > 0) {
153 sb.append(" ");
154 }
155 sb.append(Bytes.toString(vals[i]));
156 }
157 return new ImmutableBytesWritable(Bytes.toBytes(sb.toString()));
158 }
159 }