1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.classification.InterfaceStability;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.HBaseConfiguration;
28 import org.apache.hadoop.hbase.MetaTableAccessor;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.client.Connection;
31 import org.apache.hadoop.hbase.client.ConnectionFactory;
32 import org.apache.hadoop.hbase.client.Put;
33 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34 import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
35 import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
36 import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
37 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
38 import org.apache.hadoop.hbase.security.User;
39 import org.apache.hadoop.hbase.security.UserProvider;
40 import org.apache.hadoop.hbase.security.token.TokenUtil;
41 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
42 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
43 import org.apache.hadoop.io.Text;
44 import org.apache.hadoop.mapred.FileInputFormat;
45 import org.apache.hadoop.mapred.InputFormat;
46 import org.apache.hadoop.mapred.JobConf;
47 import org.apache.hadoop.mapred.OutputFormat;
48 import org.apache.hadoop.mapred.TextInputFormat;
49 import org.apache.hadoop.mapred.TextOutputFormat;
50 import org.apache.hadoop.security.token.Token;
51 import org.apache.zookeeper.KeeperException;
52
53
54
55
56 @InterfaceAudience.Public
57 @InterfaceStability.Stable
58 @SuppressWarnings({ "rawtypes", "unchecked" })
59 public class TableMapReduceUtil {
60
61
62
63
64
65
66
67
68
69
70
71
72 public static void initTableMapJob(String table, String columns,
73 Class<? extends TableMap> mapper,
74 Class<?> outputKeyClass,
75 Class<?> outputValueClass, JobConf job) {
76 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
77 true, TableInputFormat.class);
78 }
79
80 public static void initTableMapJob(String table, String columns,
81 Class<? extends TableMap> mapper,
82 Class<?> outputKeyClass,
83 Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
84 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
85 addDependencyJars, TableInputFormat.class);
86 }
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 public static void initTableMapJob(String table, String columns,
102 Class<? extends TableMap> mapper,
103 Class<?> outputKeyClass,
104 Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
105 Class<? extends InputFormat> inputFormat) {
106
107 job.setInputFormat(inputFormat);
108 job.setMapOutputValueClass(outputValueClass);
109 job.setMapOutputKeyClass(outputKeyClass);
110 job.setMapperClass(mapper);
111 job.setStrings("io.serializations", job.get("io.serializations"),
112 MutationSerialization.class.getName(), ResultSerialization.class.getName());
113 FileInputFormat.addInputPaths(job, table);
114 job.set(TableInputFormat.COLUMN_LIST, columns);
115 if (addDependencyJars) {
116 try {
117 addDependencyJars(job);
118 } catch (IOException e) {
119 e.printStackTrace();
120 }
121 }
122 try {
123 initCredentials(job);
124 } catch (IOException ioe) {
125
126 ioe.printStackTrace();
127 }
128 }
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public static void initTableSnapshotMapJob(String snapshotName, String columns,
150 Class<? extends TableMap> mapper,
151 Class<?> outputKeyClass,
152 Class<?> outputValueClass, JobConf job,
153 boolean addDependencyJars, Path tmpRestoreDir)
154 throws IOException {
155 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
156 initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
157 addDependencyJars, TableSnapshotInputFormat.class);
158 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
159 }
160
161
162
163
164
165
166
167
168
169
170 public static void initTableReduceJob(String table,
171 Class<? extends TableReduce> reducer, JobConf job)
172 throws IOException {
173 initTableReduceJob(table, reducer, job, null);
174 }
175
176
177
178
179
180
181
182
183
184
185
186
187 public static void initTableReduceJob(String table,
188 Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
189 throws IOException {
190 initTableReduceJob(table, reducer, job, partitioner, true);
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206 public static void initTableReduceJob(String table,
207 Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
208 boolean addDependencyJars) throws IOException {
209 job.setOutputFormat(TableOutputFormat.class);
210 job.setReducerClass(reducer);
211 job.set(TableOutputFormat.OUTPUT_TABLE, table);
212 job.setOutputKeyClass(ImmutableBytesWritable.class);
213 job.setOutputValueClass(Put.class);
214 job.setStrings("io.serializations", job.get("io.serializations"),
215 MutationSerialization.class.getName(), ResultSerialization.class.getName());
216 if (partitioner == HRegionPartitioner.class) {
217 job.setPartitionerClass(HRegionPartitioner.class);
218 int regions =
219 MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
220 if (job.getNumReduceTasks() > regions) {
221 job.setNumReduceTasks(regions);
222 }
223 } else if (partitioner != null) {
224 job.setPartitionerClass(partitioner);
225 }
226 if (addDependencyJars) {
227 addDependencyJars(job);
228 }
229 initCredentials(job);
230 }
231
232 public static void initCredentials(JobConf job) throws IOException {
233 UserProvider userProvider = UserProvider.instantiate(job);
234 if (userProvider.isHadoopSecurityEnabled()) {
235
236 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
237 job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
238 }
239 }
240
241 if (userProvider.isHBaseSecurityEnabled()) {
242 Connection conn = ConnectionFactory.createConnection(job);
243 try {
244
245 User user = userProvider.getCurrent();
246 TokenUtil.addTokenForJob(conn, job, user);
247 } catch (InterruptedException ie) {
248 ie.printStackTrace();
249 Thread.currentThread().interrupt();
250 } finally {
251 conn.close();
252 }
253 }
254 }
255
256
257
258
259
260
261
262
263
264
265 public static void limitNumReduceTasks(String table, JobConf job)
266 throws IOException {
267 int regions =
268 MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
269 if (job.getNumReduceTasks() > regions)
270 job.setNumReduceTasks(regions);
271 }
272
273
274
275
276
277
278
279
280
281
282 public static void limitNumMapTasks(String table, JobConf job)
283 throws IOException {
284 int regions =
285 MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
286 if (job.getNumMapTasks() > regions)
287 job.setNumMapTasks(regions);
288 }
289
290
291
292
293
294
295
296
297
298 public static void setNumReduceTasks(String table, JobConf job)
299 throws IOException {
300 job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
301 TableName.valueOf(table)));
302 }
303
304
305
306
307
308
309
310
311
312 public static void setNumMapTasks(String table, JobConf job)
313 throws IOException {
314 job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
315 TableName.valueOf(table)));
316 }
317
318
319
320
321
322
323
324
325
326
327 public static void setScannerCaching(JobConf job, int batchSize) {
328 job.setInt("hbase.client.scanner.caching", batchSize);
329 }
330
331
332
333
334 public static void addDependencyJars(JobConf job) throws IOException {
335 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
336 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
337 job,
338
339
340 job.getMapOutputKeyClass(),
341 job.getMapOutputValueClass(),
342 job.getOutputKeyClass(),
343 job.getOutputValueClass(),
344 job.getPartitionerClass(),
345 job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
346 job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
347 job.getCombinerClass());
348 }
349 }