View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  import java.util.Collection;
23  import java.util.Map;
24  
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.MetaTableAccessor;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.Connection;
33  import org.apache.hadoop.hbase.client.ConnectionFactory;
34  import org.apache.hadoop.hbase.client.Put;
35  import org.apache.hadoop.hbase.client.Scan;
36  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37  import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
38  import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
39  import org.apache.hadoop.hbase.security.User;
40  import org.apache.hadoop.hbase.security.UserProvider;
41  import org.apache.hadoop.hbase.security.token.TokenUtil;
42  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
43  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44  import org.apache.hadoop.io.Text;
45  import org.apache.hadoop.mapred.FileInputFormat;
46  import org.apache.hadoop.mapred.InputFormat;
47  import org.apache.hadoop.mapred.JobConf;
48  import org.apache.hadoop.mapred.OutputFormat;
49  import org.apache.hadoop.mapred.TextInputFormat;
50  import org.apache.hadoop.mapred.TextOutputFormat;
51  import org.apache.hadoop.security.token.Token;
52  import org.apache.zookeeper.KeeperException;
53  
54  /**
55   * Utility for {@link TableMap} and {@link TableReduce}
56   */
57  @InterfaceAudience.Public
58  @InterfaceStability.Stable
59  @SuppressWarnings({ "rawtypes", "unchecked" })
60  public class TableMapReduceUtil {
61  
62    /**
63     * Use this before submitting a TableMap job. It will
64     * appropriately set up the JobConf.
65     *
66     * @param table  The table name to read from.
67     * @param columns  The columns to scan.
68     * @param mapper  The mapper class to use.
69     * @param outputKeyClass  The class of the output key.
70     * @param outputValueClass  The class of the output value.
71     * @param job  The current job configuration to adjust.
72     */
73    public static void initTableMapJob(String table, String columns,
74      Class<? extends TableMap> mapper,
75      Class<?> outputKeyClass,
76      Class<?> outputValueClass, JobConf job) {
77      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
78        true, TableInputFormat.class);
79    }
80  
81    public static void initTableMapJob(String table, String columns,
82      Class<? extends TableMap> mapper,
83      Class<?> outputKeyClass,
84      Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
85      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
86        addDependencyJars, TableInputFormat.class);
87    }
88  
89    /**
90     * Use this before submitting a TableMap job. It will
91     * appropriately set up the JobConf.
92     *
93     * @param table  The table name to read from.
94     * @param columns  The columns to scan.
95     * @param mapper  The mapper class to use.
96     * @param outputKeyClass  The class of the output key.
97     * @param outputValueClass  The class of the output value.
98     * @param job  The current job configuration to adjust.
99     * @param addDependencyJars upload HBase jars and jars for any of the configured
100    *           job classes via the distributed cache (tmpjars).
101    */
102   public static void initTableMapJob(String table, String columns,
103     Class<? extends TableMap> mapper,
104     Class<?> outputKeyClass,
105     Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
106     Class<? extends InputFormat> inputFormat) {
107 
108     job.setInputFormat(inputFormat);
109     job.setMapOutputValueClass(outputValueClass);
110     job.setMapOutputKeyClass(outputKeyClass);
111     job.setMapperClass(mapper);
112     job.setStrings("io.serializations", job.get("io.serializations"),
113         MutationSerialization.class.getName(), ResultSerialization.class.getName());
114     FileInputFormat.addInputPaths(job, table);
115     job.set(TableInputFormat.COLUMN_LIST, columns);
116     if (addDependencyJars) {
117       try {
118         addDependencyJars(job);
119       } catch (IOException e) {
120         e.printStackTrace();
121       }
122     }
123     try {
124       initCredentials(job);
125     } catch (IOException ioe) {
126       // just spit out the stack trace?  really?
127       ioe.printStackTrace();
128     }
129   }
130 
131   /**
132    * Sets up the job for reading from one or more multiple table snapshots, with one or more scans
133    * per snapshot.
134    * It bypasses hbase servers and read directly from snapshot files.
135    *
136    * @param snapshotScans     map of snapshot name to scans on that snapshot.
137    * @param mapper            The mapper class to use.
138    * @param outputKeyClass    The class of the output key.
139    * @param outputValueClass  The class of the output value.
140    * @param job               The current job to adjust.  Make sure the passed job is
141    *                          carrying all necessary HBase configuration.
142    * @param addDependencyJars upload HBase jars and jars for any of the configured
143    *                          job classes via the distributed cache (tmpjars).
144    */
145   public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
146       Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
147       JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
148     MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
149 
150     job.setInputFormat(MultiTableSnapshotInputFormat.class);
151     if (outputValueClass != null) {
152       job.setMapOutputValueClass(outputValueClass);
153     }
154     if (outputKeyClass != null) {
155       job.setMapOutputKeyClass(outputKeyClass);
156     }
157     job.setMapperClass(mapper);
158     if (addDependencyJars) {
159       addDependencyJars(job);
160     }
161 
162     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
163   }
164 
165   /**
166    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
167    * and read directly from snapshot files.
168    *
169    * @param snapshotName The name of the snapshot (of a table) to read from.
170    * @param columns  The columns to scan.
171    * @param mapper  The mapper class to use.
172    * @param outputKeyClass  The class of the output key.
173    * @param outputValueClass  The class of the output value.
174    * @param job  The current job to adjust.  Make sure the passed job is
175    * carrying all necessary HBase configuration.
176    * @param addDependencyJars upload HBase jars and jars for any of the configured
177    *           job classes via the distributed cache (tmpjars).
178    * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
179    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
180    * After the job is finished, restore directory can be deleted.
181    * @throws IOException When setting up the details fails.
182    * @see TableSnapshotInputFormat
183    */
184   public static void initTableSnapshotMapJob(String snapshotName, String columns,
185       Class<? extends TableMap> mapper,
186       Class<?> outputKeyClass,
187       Class<?> outputValueClass, JobConf job,
188       boolean addDependencyJars, Path tmpRestoreDir)
189   throws IOException {
190     TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
191     initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
192       addDependencyJars, TableSnapshotInputFormat.class);
193     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
194   }
195 
196   /**
197    * Use this before submitting a TableReduce job. It will
198    * appropriately set up the JobConf.
199    *
200    * @param table  The output table.
201    * @param reducer  The reducer class to use.
202    * @param job  The current job configuration to adjust.
203    * @throws IOException When determining the region count fails.
204    */
205   public static void initTableReduceJob(String table,
206     Class<? extends TableReduce> reducer, JobConf job)
207   throws IOException {
208     initTableReduceJob(table, reducer, job, null);
209   }
210 
211   /**
212    * Use this before submitting a TableReduce job. It will
213    * appropriately set up the JobConf.
214    *
215    * @param table  The output table.
216    * @param reducer  The reducer class to use.
217    * @param job  The current job configuration to adjust.
218    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
219    * default partitioner.
220    * @throws IOException When determining the region count fails.
221    */
222   public static void initTableReduceJob(String table,
223     Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
224   throws IOException {
225     initTableReduceJob(table, reducer, job, partitioner, true);
226   }
227 
228   /**
229    * Use this before submitting a TableReduce job. It will
230    * appropriately set up the JobConf.
231    *
232    * @param table  The output table.
233    * @param reducer  The reducer class to use.
234    * @param job  The current job configuration to adjust.
235    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
236    * default partitioner.
237    * @param addDependencyJars upload HBase jars and jars for any of the configured
238    *           job classes via the distributed cache (tmpjars).
239    * @throws IOException When determining the region count fails.
240    */
241   public static void initTableReduceJob(String table,
242     Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
243     boolean addDependencyJars) throws IOException {
244     job.setOutputFormat(TableOutputFormat.class);
245     job.setReducerClass(reducer);
246     job.set(TableOutputFormat.OUTPUT_TABLE, table);
247     job.setOutputKeyClass(ImmutableBytesWritable.class);
248     job.setOutputValueClass(Put.class);
249     job.setStrings("io.serializations", job.get("io.serializations"),
250         MutationSerialization.class.getName(), ResultSerialization.class.getName());
251     if (partitioner == HRegionPartitioner.class) {
252       job.setPartitionerClass(HRegionPartitioner.class);
253       int regions =
254         MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
255       if (job.getNumReduceTasks() > regions) {
256         job.setNumReduceTasks(regions);
257       }
258     } else if (partitioner != null) {
259       job.setPartitionerClass(partitioner);
260     }
261     if (addDependencyJars) {
262       addDependencyJars(job);
263     }
264     initCredentials(job);
265   }
266 
267   public static void initCredentials(JobConf job) throws IOException {
268     UserProvider userProvider = UserProvider.instantiate(job);
269     if (userProvider.isHadoopSecurityEnabled()) {
270       // propagate delegation related props from launcher job to MR job
271       if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
272         job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
273       }
274     }
275 
276     if (userProvider.isHBaseSecurityEnabled()) {
277       Connection conn = ConnectionFactory.createConnection(job);
278       try {
279         // login the server principal (if using secure Hadoop)
280         User user = userProvider.getCurrent();
281         TokenUtil.addTokenForJob(conn, job, user);
282       } catch (InterruptedException ie) {
283         ie.printStackTrace();
284         Thread.currentThread().interrupt();
285       } finally {
286         conn.close();
287       }
288     }
289   }
290 
291   /**
292    * Ensures that the given number of reduce tasks for the given job
293    * configuration does not exceed the number of regions for the given table.
294    *
295    * @param table  The table to get the region count for.
296    * @param job  The current job configuration to adjust.
297    * @throws IOException When retrieving the table details fails.
298    */
299   // Used by tests.
300   public static void limitNumReduceTasks(String table, JobConf job)
301   throws IOException {
302     int regions =
303       MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
304     if (job.getNumReduceTasks() > regions)
305       job.setNumReduceTasks(regions);
306   }
307 
308   /**
309    * Ensures that the given number of map tasks for the given job
310    * configuration does not exceed the number of regions for the given table.
311    *
312    * @param table  The table to get the region count for.
313    * @param job  The current job configuration to adjust.
314    * @throws IOException When retrieving the table details fails.
315    */
316   // Used by tests.
317   public static void limitNumMapTasks(String table, JobConf job)
318   throws IOException {
319     int regions =
320       MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
321     if (job.getNumMapTasks() > regions)
322       job.setNumMapTasks(regions);
323   }
324 
325   /**
326    * Sets the number of reduce tasks for the given job configuration to the
327    * number of regions the given table has.
328    *
329    * @param table  The table to get the region count for.
330    * @param job  The current job configuration to adjust.
331    * @throws IOException When retrieving the table details fails.
332    */
333   public static void setNumReduceTasks(String table, JobConf job)
334   throws IOException {
335     job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
336       TableName.valueOf(table)));
337   }
338 
339   /**
340    * Sets the number of map tasks for the given job configuration to the
341    * number of regions the given table has.
342    *
343    * @param table  The table to get the region count for.
344    * @param job  The current job configuration to adjust.
345    * @throws IOException When retrieving the table details fails.
346    */
347   public static void setNumMapTasks(String table, JobConf job)
348   throws IOException {
349     job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
350       TableName.valueOf(table)));
351   }
352 
353   /**
354    * Sets the number of rows to return and cache with each scanner iteration.
355    * Higher caching values will enable faster mapreduce jobs at the expense of
356    * requiring more heap to contain the cached rows.
357    *
358    * @param job The current job configuration to adjust.
359    * @param batchSize The number of rows to return in batch with each scanner
360    * iteration.
361    */
362   public static void setScannerCaching(JobConf job, int batchSize) {
363     job.setInt("hbase.client.scanner.caching", batchSize);
364   }
365 
366   /**
367    * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
368    */
369   public static void addDependencyJars(JobConf job) throws IOException {
370     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
371     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
372       job,
373       // when making changes here, consider also mapreduce.TableMapReduceUtil
374       // pull job classes
375       job.getMapOutputKeyClass(),
376       job.getMapOutputValueClass(),
377       job.getOutputKeyClass(),
378       job.getOutputValueClass(),
379       job.getPartitionerClass(),
380       job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
381       job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
382       job.getCombinerClass());
383   }
384 }