1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.Map;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.MetaTableAccessor;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.Connection;
33 import org.apache.hadoop.hbase.client.ConnectionFactory;
34 import org.apache.hadoop.hbase.client.Put;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37 import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
38 import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
39 import org.apache.hadoop.hbase.security.User;
40 import org.apache.hadoop.hbase.security.UserProvider;
41 import org.apache.hadoop.hbase.security.token.TokenUtil;
42 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
43 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44 import org.apache.hadoop.io.Text;
45 import org.apache.hadoop.mapred.FileInputFormat;
46 import org.apache.hadoop.mapred.InputFormat;
47 import org.apache.hadoop.mapred.JobConf;
48 import org.apache.hadoop.mapred.OutputFormat;
49 import org.apache.hadoop.mapred.TextInputFormat;
50 import org.apache.hadoop.mapred.TextOutputFormat;
51 import org.apache.hadoop.security.token.Token;
52 import org.apache.zookeeper.KeeperException;
53
54
55
56
57 @InterfaceAudience.Public
58 @InterfaceStability.Stable
59 @SuppressWarnings({ "rawtypes", "unchecked" })
60 public class TableMapReduceUtil {
61
62
63
64
65
66
67
68
69
70
71
72
73 public static void initTableMapJob(String table, String columns,
74 Class<? extends TableMap> mapper,
75 Class<?> outputKeyClass,
76 Class<?> outputValueClass, JobConf job) {
77 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
78 true, TableInputFormat.class);
79 }
80
81 public static void initTableMapJob(String table, String columns,
82 Class<? extends TableMap> mapper,
83 Class<?> outputKeyClass,
84 Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
85 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
86 addDependencyJars, TableInputFormat.class);
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 public static void initTableMapJob(String table, String columns,
103 Class<? extends TableMap> mapper,
104 Class<?> outputKeyClass,
105 Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
106 Class<? extends InputFormat> inputFormat) {
107
108 job.setInputFormat(inputFormat);
109 job.setMapOutputValueClass(outputValueClass);
110 job.setMapOutputKeyClass(outputKeyClass);
111 job.setMapperClass(mapper);
112 job.setStrings("io.serializations", job.get("io.serializations"),
113 MutationSerialization.class.getName(), ResultSerialization.class.getName());
114 FileInputFormat.addInputPaths(job, table);
115 job.set(TableInputFormat.COLUMN_LIST, columns);
116 if (addDependencyJars) {
117 try {
118 addDependencyJars(job);
119 } catch (IOException e) {
120 e.printStackTrace();
121 }
122 }
123 try {
124 initCredentials(job);
125 } catch (IOException ioe) {
126
127 ioe.printStackTrace();
128 }
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
146 Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
147 JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
148 MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
149
150 job.setInputFormat(MultiTableSnapshotInputFormat.class);
151 if (outputValueClass != null) {
152 job.setMapOutputValueClass(outputValueClass);
153 }
154 if (outputKeyClass != null) {
155 job.setMapOutputKeyClass(outputKeyClass);
156 }
157 job.setMapperClass(mapper);
158 if (addDependencyJars) {
159 addDependencyJars(job);
160 }
161
162 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
163 }
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 public static void initTableSnapshotMapJob(String snapshotName, String columns,
185 Class<? extends TableMap> mapper,
186 Class<?> outputKeyClass,
187 Class<?> outputValueClass, JobConf job,
188 boolean addDependencyJars, Path tmpRestoreDir)
189 throws IOException {
190 TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
191 initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
192 addDependencyJars, TableSnapshotInputFormat.class);
193 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
194 }
195
196
197
198
199
200
201
202
203
204
205 public static void initTableReduceJob(String table,
206 Class<? extends TableReduce> reducer, JobConf job)
207 throws IOException {
208 initTableReduceJob(table, reducer, job, null);
209 }
210
211
212
213
214
215
216
217
218
219
220
221
222 public static void initTableReduceJob(String table,
223 Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
224 throws IOException {
225 initTableReduceJob(table, reducer, job, partitioner, true);
226 }
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241 public static void initTableReduceJob(String table,
242 Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
243 boolean addDependencyJars) throws IOException {
244 job.setOutputFormat(TableOutputFormat.class);
245 job.setReducerClass(reducer);
246 job.set(TableOutputFormat.OUTPUT_TABLE, table);
247 job.setOutputKeyClass(ImmutableBytesWritable.class);
248 job.setOutputValueClass(Put.class);
249 job.setStrings("io.serializations", job.get("io.serializations"),
250 MutationSerialization.class.getName(), ResultSerialization.class.getName());
251 if (partitioner == HRegionPartitioner.class) {
252 job.setPartitionerClass(HRegionPartitioner.class);
253 int regions =
254 MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
255 if (job.getNumReduceTasks() > regions) {
256 job.setNumReduceTasks(regions);
257 }
258 } else if (partitioner != null) {
259 job.setPartitionerClass(partitioner);
260 }
261 if (addDependencyJars) {
262 addDependencyJars(job);
263 }
264 initCredentials(job);
265 }
266
267 public static void initCredentials(JobConf job) throws IOException {
268 UserProvider userProvider = UserProvider.instantiate(job);
269 if (userProvider.isHadoopSecurityEnabled()) {
270
271 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
272 job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
273 }
274 }
275
276 if (userProvider.isHBaseSecurityEnabled()) {
277 Connection conn = ConnectionFactory.createConnection(job);
278 try {
279
280 User user = userProvider.getCurrent();
281 TokenUtil.addTokenForJob(conn, job, user);
282 } catch (InterruptedException ie) {
283 ie.printStackTrace();
284 Thread.currentThread().interrupt();
285 } finally {
286 conn.close();
287 }
288 }
289 }
290
291
292
293
294
295
296
297
298
299
300 public static void limitNumReduceTasks(String table, JobConf job)
301 throws IOException {
302 int regions =
303 MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
304 if (job.getNumReduceTasks() > regions)
305 job.setNumReduceTasks(regions);
306 }
307
308
309
310
311
312
313
314
315
316
317 public static void limitNumMapTasks(String table, JobConf job)
318 throws IOException {
319 int regions =
320 MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
321 if (job.getNumMapTasks() > regions)
322 job.setNumMapTasks(regions);
323 }
324
325
326
327
328
329
330
331
332
333 public static void setNumReduceTasks(String table, JobConf job)
334 throws IOException {
335 job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
336 TableName.valueOf(table)));
337 }
338
339
340
341
342
343
344
345
346
347 public static void setNumMapTasks(String table, JobConf job)
348 throws IOException {
349 job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
350 TableName.valueOf(table)));
351 }
352
353
354
355
356
357
358
359
360
361
362 public static void setScannerCaching(JobConf job, int batchSize) {
363 job.setInt("hbase.client.scanner.caching", batchSize);
364 }
365
366
367
368
369 public static void addDependencyJars(JobConf job) throws IOException {
370 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
371 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
372 job,
373
374
375 job.getMapOutputKeyClass(),
376 job.getMapOutputValueClass(),
377 job.getOutputKeyClass(),
378 job.getOutputValueClass(),
379 job.getPartitionerClass(),
380 job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
381 job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
382 job.getCombinerClass());
383 }
384 }