1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.classification.InterfaceStability;
26 import org.apache.hadoop.conf.Configurable;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.CellUtil;
30 import org.apache.hadoop.hbase.KeyValue;
31 import org.apache.hadoop.hbase.client.Result;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.mapreduce.Job;
36
37
38
39
40 @InterfaceAudience.Public
41 @InterfaceStability.Stable
42 public class GroupingTableMapper
43 extends TableMapper<ImmutableBytesWritable,Result> implements Configurable {
44
45
46
47
48
49 public static final String GROUP_COLUMNS =
50 "hbase.mapred.groupingtablemap.columns";
51
52
53 protected byte [][] columns;
54
55 private Configuration conf = null;
56
57
58
59
60
61
62
63
64
65
66
67
68
69 @SuppressWarnings("unchecked")
70 public static void initJob(String table, Scan scan, String groupColumns,
71 Class<? extends TableMapper> mapper, Job job) throws IOException {
72 TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
73 ImmutableBytesWritable.class, Result.class, job);
74 job.getConfiguration().set(GROUP_COLUMNS, groupColumns);
75 }
76
77
78
79
80
81
82
83
84
85
86
87
88 @Override
89 public void map(ImmutableBytesWritable key, Result value, Context context)
90 throws IOException, InterruptedException {
91 byte[][] keyVals = extractKeyValues(value);
92 if(keyVals != null) {
93 ImmutableBytesWritable tKey = createGroupKey(keyVals);
94 context.write(tKey, value);
95 }
96 }
97
98
99
100
101
102
103
104
105
106
107 protected byte[][] extractKeyValues(Result r) {
108 byte[][] keyVals = null;
109 ArrayList<byte[]> foundList = new ArrayList<byte[]>();
110 int numCols = columns.length;
111 if (numCols > 0) {
112 for (Cell value: r.listCells()) {
113 byte [] column = KeyValue.makeColumn(CellUtil.cloneFamily(value),
114 CellUtil.cloneQualifier(value));
115 for (int i = 0; i < numCols; i++) {
116 if (Bytes.equals(column, columns[i])) {
117 foundList.add(CellUtil.cloneValue(value));
118 break;
119 }
120 }
121 }
122 if(foundList.size() == numCols) {
123 keyVals = foundList.toArray(new byte[numCols][]);
124 }
125 }
126 return keyVals;
127 }
128
129
130
131
132
133
134
135
136
137 protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
138 if(vals == null) {
139 return null;
140 }
141 StringBuilder sb = new StringBuilder();
142 for(int i = 0; i < vals.length; i++) {
143 if(i > 0) {
144 sb.append(" ");
145 }
146 sb.append(Bytes.toString(vals[i]));
147 }
148 return new ImmutableBytesWritable(Bytes.toBytes(sb.toString()));
149 }
150
151
152
153
154
155
156
157 @Override
158 public Configuration getConf() {
159 return conf;
160 }
161
162
163
164
165
166
167
168
169 @Override
170 public void setConf(Configuration configuration) {
171 this.conf = configuration;
172 String[] cols = conf.get(GROUP_COLUMNS, "").split(" ");
173 columns = new byte[cols.length][];
174 for(int i = 0; i < cols.length; i++) {
175 columns[i] = Bytes.toBytes(cols[i]);
176 }
177 }
178
179 }