View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import org.apache.hadoop.fs.Path;
22  import org.apache.hadoop.hbase.HBaseConfiguration;
23  import org.apache.hadoop.hbase.MetaTableAccessor;
24  import org.apache.hadoop.hbase.TableName;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.hbase.client.Connection;
28  import org.apache.hadoop.hbase.client.ConnectionFactory;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32  import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
33  import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
34  import org.apache.hadoop.hbase.security.User;
35  import org.apache.hadoop.hbase.security.UserProvider;
36  import org.apache.hadoop.hbase.security.token.TokenUtil;
37  import org.apache.hadoop.mapred.FileInputFormat;
38  import org.apache.hadoop.mapred.InputFormat;
39  import org.apache.hadoop.mapred.JobConf;
40  import org.apache.hadoop.mapred.OutputFormat;
41  import org.apache.hadoop.mapred.TextInputFormat;
42  import org.apache.hadoop.mapred.TextOutputFormat;
43  
44  import java.io.IOException;
45  import java.util.Collection;
46  import java.util.Map;
47  
48  /**
49   * Utility for {@link TableMap} and {@link TableReduce}
50   */
51  @InterfaceAudience.Public
52  @InterfaceStability.Stable
53  @SuppressWarnings({ "rawtypes", "unchecked" })
54  public class TableMapReduceUtil {
55  
56    /**
57     * Use this before submitting a TableMap job. It will
58     * appropriately set up the JobConf.
59     *
60     * @param table  The table name to read from.
61     * @param columns  The columns to scan.
62     * @param mapper  The mapper class to use.
63     * @param outputKeyClass  The class of the output key.
64     * @param outputValueClass  The class of the output value.
65     * @param job  The current job configuration to adjust.
66     */
67    public static void initTableMapJob(String table, String columns,
68      Class<? extends TableMap> mapper,
69      Class<?> outputKeyClass,
70      Class<?> outputValueClass, JobConf job) {
71      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
72        true, TableInputFormat.class);
73    }
74  
75    public static void initTableMapJob(String table, String columns,
76      Class<? extends TableMap> mapper,
77      Class<?> outputKeyClass,
78      Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
79      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
80        addDependencyJars, TableInputFormat.class);
81    }
82  
83    /**
84     * Use this before submitting a TableMap job. It will
85     * appropriately set up the JobConf.
86     *
87     * @param table  The table name to read from.
88     * @param columns  The columns to scan.
89     * @param mapper  The mapper class to use.
90     * @param outputKeyClass  The class of the output key.
91     * @param outputValueClass  The class of the output value.
92     * @param job  The current job configuration to adjust.
93     * @param addDependencyJars upload HBase jars and jars for any of the configured
94     *           job classes via the distributed cache (tmpjars).
95     */
96    public static void initTableMapJob(String table, String columns,
97      Class<? extends TableMap> mapper,
98      Class<?> outputKeyClass,
99      Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
100     Class<? extends InputFormat> inputFormat) {
101 
102     job.setInputFormat(inputFormat);
103     job.setMapOutputValueClass(outputValueClass);
104     job.setMapOutputKeyClass(outputKeyClass);
105     job.setMapperClass(mapper);
106     job.setStrings("io.serializations", job.get("io.serializations"),
107         MutationSerialization.class.getName(), ResultSerialization.class.getName());
108     FileInputFormat.addInputPaths(job, table);
109     job.set(TableInputFormat.COLUMN_LIST, columns);
110     if (addDependencyJars) {
111       try {
112         addDependencyJars(job);
113       } catch (IOException e) {
114         e.printStackTrace();
115       }
116     }
117     try {
118       initCredentials(job);
119     } catch (IOException ioe) {
120       // just spit out the stack trace?  really?
121       ioe.printStackTrace();
122     }
123   }
124 
125   /**
126    * Sets up the job for reading from one or more multiple table snapshots, with one or more scans
127    * per snapshot.
128    * It bypasses hbase servers and read directly from snapshot files.
129    *
130    * @param snapshotScans     map of snapshot name to scans on that snapshot.
131    * @param mapper            The mapper class to use.
132    * @param outputKeyClass    The class of the output key.
133    * @param outputValueClass  The class of the output value.
134    * @param job               The current job to adjust.  Make sure the passed job is
135    *                          carrying all necessary HBase configuration.
136    * @param addDependencyJars upload HBase jars and jars for any of the configured
137    *                          job classes via the distributed cache (tmpjars).
138    */
139   public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
140       Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
141       JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
142     MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
143 
144     job.setInputFormat(MultiTableSnapshotInputFormat.class);
145     if (outputValueClass != null) {
146       job.setMapOutputValueClass(outputValueClass);
147     }
148     if (outputKeyClass != null) {
149       job.setMapOutputKeyClass(outputKeyClass);
150     }
151     job.setMapperClass(mapper);
152     if (addDependencyJars) {
153       addDependencyJars(job);
154     }
155 
156     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
157   }
158 
159   /**
160    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
161    * and read directly from snapshot files.
162    *
163    * @param snapshotName The name of the snapshot (of a table) to read from.
164    * @param columns  The columns to scan.
165    * @param mapper  The mapper class to use.
166    * @param outputKeyClass  The class of the output key.
167    * @param outputValueClass  The class of the output value.
168    * @param job  The current job to adjust.  Make sure the passed job is
169    * carrying all necessary HBase configuration.
170    * @param addDependencyJars upload HBase jars and jars for any of the configured
171    *           job classes via the distributed cache (tmpjars).
172    * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
173    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
174    * After the job is finished, restore directory can be deleted.
175    * @throws IOException When setting up the details fails.
176    * @see TableSnapshotInputFormat
177    */
178   public static void initTableSnapshotMapJob(String snapshotName, String columns,
179       Class<? extends TableMap> mapper,
180       Class<?> outputKeyClass,
181       Class<?> outputValueClass, JobConf job,
182       boolean addDependencyJars, Path tmpRestoreDir)
183   throws IOException {
184     TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
185     initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
186       addDependencyJars, TableSnapshotInputFormat.class);
187     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
188   }
189 
190   /**
191    * Use this before submitting a TableReduce job. It will
192    * appropriately set up the JobConf.
193    *
194    * @param table  The output table.
195    * @param reducer  The reducer class to use.
196    * @param job  The current job configuration to adjust.
197    * @throws IOException When determining the region count fails.
198    */
199   public static void initTableReduceJob(String table,
200     Class<? extends TableReduce> reducer, JobConf job)
201   throws IOException {
202     initTableReduceJob(table, reducer, job, null);
203   }
204 
205   /**
206    * Use this before submitting a TableReduce job. It will
207    * appropriately set up the JobConf.
208    *
209    * @param table  The output table.
210    * @param reducer  The reducer class to use.
211    * @param job  The current job configuration to adjust.
212    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
213    * default partitioner.
214    * @throws IOException When determining the region count fails.
215    */
216   public static void initTableReduceJob(String table,
217     Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
218   throws IOException {
219     initTableReduceJob(table, reducer, job, partitioner, true);
220   }
221 
222   /**
223    * Use this before submitting a TableReduce job. It will
224    * appropriately set up the JobConf.
225    *
226    * @param table  The output table.
227    * @param reducer  The reducer class to use.
228    * @param job  The current job configuration to adjust.
229    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
230    * default partitioner.
231    * @param addDependencyJars upload HBase jars and jars for any of the configured
232    *           job classes via the distributed cache (tmpjars).
233    * @throws IOException When determining the region count fails.
234    */
235   public static void initTableReduceJob(String table,
236     Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
237     boolean addDependencyJars) throws IOException {
238     job.setOutputFormat(TableOutputFormat.class);
239     job.setReducerClass(reducer);
240     job.set(TableOutputFormat.OUTPUT_TABLE, table);
241     job.setOutputKeyClass(ImmutableBytesWritable.class);
242     job.setOutputValueClass(Put.class);
243     job.setStrings("io.serializations", job.get("io.serializations"),
244         MutationSerialization.class.getName(), ResultSerialization.class.getName());
245     if (partitioner == HRegionPartitioner.class) {
246       job.setPartitionerClass(HRegionPartitioner.class);
247       int regions =
248         MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
249       if (job.getNumReduceTasks() > regions) {
250         job.setNumReduceTasks(regions);
251       }
252     } else if (partitioner != null) {
253       job.setPartitionerClass(partitioner);
254     }
255     if (addDependencyJars) {
256       addDependencyJars(job);
257     }
258     initCredentials(job);
259   }
260 
261   public static void initCredentials(JobConf job) throws IOException {
262     UserProvider userProvider = UserProvider.instantiate(job);
263     if (userProvider.isHadoopSecurityEnabled()) {
264       // propagate delegation related props from launcher job to MR job
265       if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
266         job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
267       }
268     }
269 
270     if (userProvider.isHBaseSecurityEnabled()) {
271       Connection conn = ConnectionFactory.createConnection(job);
272       try {
273         // login the server principal (if using secure Hadoop)
274         User user = userProvider.getCurrent();
275         TokenUtil.addTokenForJob(conn, job, user);
276       } catch (InterruptedException ie) {
277         ie.printStackTrace();
278         Thread.currentThread().interrupt();
279       } finally {
280         conn.close();
281       }
282     }
283   }
284 
285   /**
286    * Ensures that the given number of reduce tasks for the given job
287    * configuration does not exceed the number of regions for the given table.
288    *
289    * @param table  The table to get the region count for.
290    * @param job  The current job configuration to adjust.
291    * @throws IOException When retrieving the table details fails.
292    */
293   // Used by tests.
294   public static void limitNumReduceTasks(String table, JobConf job)
295   throws IOException {
296     int regions =
297       MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
298     if (job.getNumReduceTasks() > regions)
299       job.setNumReduceTasks(regions);
300   }
301 
302   /**
303    * Ensures that the given number of map tasks for the given job
304    * configuration does not exceed the number of regions for the given table.
305    *
306    * @param table  The table to get the region count for.
307    * @param job  The current job configuration to adjust.
308    * @throws IOException When retrieving the table details fails.
309    */
310   // Used by tests.
311   public static void limitNumMapTasks(String table, JobConf job)
312   throws IOException {
313     int regions =
314       MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
315     if (job.getNumMapTasks() > regions)
316       job.setNumMapTasks(regions);
317   }
318 
319   /**
320    * Sets the number of reduce tasks for the given job configuration to the
321    * number of regions the given table has.
322    *
323    * @param table  The table to get the region count for.
324    * @param job  The current job configuration to adjust.
325    * @throws IOException When retrieving the table details fails.
326    */
327   public static void setNumReduceTasks(String table, JobConf job)
328   throws IOException {
329     job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
330       TableName.valueOf(table)));
331   }
332 
333   /**
334    * Sets the number of map tasks for the given job configuration to the
335    * number of regions the given table has.
336    *
337    * @param table  The table to get the region count for.
338    * @param job  The current job configuration to adjust.
339    * @throws IOException When retrieving the table details fails.
340    */
341   public static void setNumMapTasks(String table, JobConf job)
342   throws IOException {
343     job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
344       TableName.valueOf(table)));
345   }
346 
347   /**
348    * Sets the number of rows to return and cache with each scanner iteration.
349    * Higher caching values will enable faster mapreduce jobs at the expense of
350    * requiring more heap to contain the cached rows.
351    *
352    * @param job The current job configuration to adjust.
353    * @param batchSize The number of rows to return in batch with each scanner
354    * iteration.
355    */
356   public static void setScannerCaching(JobConf job, int batchSize) {
357     job.setInt("hbase.client.scanner.caching", batchSize);
358   }
359 
360   /**
361    * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
362    */
363   public static void addDependencyJars(JobConf job) throws IOException {
364     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
365     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
366       job,
367       // when making changes here, consider also mapreduce.TableMapReduceUtil
368       // pull job classes
369       job.getMapOutputKeyClass(),
370       job.getMapOutputValueClass(),
371       job.getOutputKeyClass(),
372       job.getOutputValueClass(),
373       job.getPartitionerClass(),
374       job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
375       job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
376       job.getCombinerClass());
377   }
378 }