View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.classification.InterfaceStability;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.MetaTableAccessor;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.client.Connection;
31  import org.apache.hadoop.hbase.client.ConnectionFactory;
32  import org.apache.hadoop.hbase.client.Put;
33  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34  import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
35  import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
36  import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
37  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
38  import org.apache.hadoop.hbase.security.User;
39  import org.apache.hadoop.hbase.security.UserProvider;
40  import org.apache.hadoop.hbase.security.token.TokenUtil;
41  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
42  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
43  import org.apache.hadoop.io.Text;
44  import org.apache.hadoop.mapred.FileInputFormat;
45  import org.apache.hadoop.mapred.InputFormat;
46  import org.apache.hadoop.mapred.JobConf;
47  import org.apache.hadoop.mapred.OutputFormat;
48  import org.apache.hadoop.mapred.TextInputFormat;
49  import org.apache.hadoop.mapred.TextOutputFormat;
50  import org.apache.hadoop.security.token.Token;
51  import org.apache.zookeeper.KeeperException;
52  
53  /**
54   * Utility for {@link TableMap} and {@link TableReduce}
55   */
56  @InterfaceAudience.Public
57  @InterfaceStability.Stable
58  @SuppressWarnings({ "rawtypes", "unchecked" })
59  public class TableMapReduceUtil {
60  
61    /**
62     * Use this before submitting a TableMap job. It will
63     * appropriately set up the JobConf.
64     *
65     * @param table  The table name to read from.
66     * @param columns  The columns to scan.
67     * @param mapper  The mapper class to use.
68     * @param outputKeyClass  The class of the output key.
69     * @param outputValueClass  The class of the output value.
70     * @param job  The current job configuration to adjust.
71     */
72    public static void initTableMapJob(String table, String columns,
73      Class<? extends TableMap> mapper,
74      Class<?> outputKeyClass,
75      Class<?> outputValueClass, JobConf job) {
76      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
77        true, TableInputFormat.class);
78    }
79  
80    public static void initTableMapJob(String table, String columns,
81      Class<? extends TableMap> mapper,
82      Class<?> outputKeyClass,
83      Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
84      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
85        addDependencyJars, TableInputFormat.class);
86    }
87  
88    /**
89     * Use this before submitting a TableMap job. It will
90     * appropriately set up the JobConf.
91     *
92     * @param table  The table name to read from.
93     * @param columns  The columns to scan.
94     * @param mapper  The mapper class to use.
95     * @param outputKeyClass  The class of the output key.
96     * @param outputValueClass  The class of the output value.
97     * @param job  The current job configuration to adjust.
98     * @param addDependencyJars upload HBase jars and jars for any of the configured
99     *           job classes via the distributed cache (tmpjars).
100    */
101   public static void initTableMapJob(String table, String columns,
102     Class<? extends TableMap> mapper,
103     Class<?> outputKeyClass,
104     Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
105     Class<? extends InputFormat> inputFormat) {
106 
107     job.setInputFormat(inputFormat);
108     job.setMapOutputValueClass(outputValueClass);
109     job.setMapOutputKeyClass(outputKeyClass);
110     job.setMapperClass(mapper);
111     job.setStrings("io.serializations", job.get("io.serializations"),
112         MutationSerialization.class.getName(), ResultSerialization.class.getName());
113     FileInputFormat.addInputPaths(job, table);
114     job.set(TableInputFormat.COLUMN_LIST, columns);
115     if (addDependencyJars) {
116       try {
117         addDependencyJars(job);
118       } catch (IOException e) {
119         e.printStackTrace();
120       }
121     }
122     try {
123       initCredentials(job);
124     } catch (IOException ioe) {
125       // just spit out the stack trace?  really?
126       ioe.printStackTrace();
127     }
128   }
129 
130   /**
131    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
132    * and read directly from snapshot files.
133    *
134    * @param snapshotName The name of the snapshot (of a table) to read from.
135    * @param columns  The columns to scan.
136    * @param mapper  The mapper class to use.
137    * @param outputKeyClass  The class of the output key.
138    * @param outputValueClass  The class of the output value.
139    * @param job  The current job to adjust.  Make sure the passed job is
140    * carrying all necessary HBase configuration.
141    * @param addDependencyJars upload HBase jars and jars for any of the configured
142    *           job classes via the distributed cache (tmpjars).
143    * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
144    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
145    * After the job is finished, restore directory can be deleted.
146    * @throws IOException When setting up the details fails.
147    * @see TableSnapshotInputFormat
148    */
149   public static void initTableSnapshotMapJob(String snapshotName, String columns,
150       Class<? extends TableMap> mapper,
151       Class<?> outputKeyClass,
152       Class<?> outputValueClass, JobConf job,
153       boolean addDependencyJars, Path tmpRestoreDir)
154   throws IOException {
155     TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
156     initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
157       addDependencyJars, TableSnapshotInputFormat.class);
158     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
159   }
160 
161   /**
162    * Use this before submitting a TableReduce job. It will
163    * appropriately set up the JobConf.
164    *
165    * @param table  The output table.
166    * @param reducer  The reducer class to use.
167    * @param job  The current job configuration to adjust.
168    * @throws IOException When determining the region count fails.
169    */
170   public static void initTableReduceJob(String table,
171     Class<? extends TableReduce> reducer, JobConf job)
172   throws IOException {
173     initTableReduceJob(table, reducer, job, null);
174   }
175 
176   /**
177    * Use this before submitting a TableReduce job. It will
178    * appropriately set up the JobConf.
179    *
180    * @param table  The output table.
181    * @param reducer  The reducer class to use.
182    * @param job  The current job configuration to adjust.
183    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
184    * default partitioner.
185    * @throws IOException When determining the region count fails.
186    */
187   public static void initTableReduceJob(String table,
188     Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
189   throws IOException {
190     initTableReduceJob(table, reducer, job, partitioner, true);
191   }
192 
193   /**
194    * Use this before submitting a TableReduce job. It will
195    * appropriately set up the JobConf.
196    *
197    * @param table  The output table.
198    * @param reducer  The reducer class to use.
199    * @param job  The current job configuration to adjust.
200    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
201    * default partitioner.
202    * @param addDependencyJars upload HBase jars and jars for any of the configured
203    *           job classes via the distributed cache (tmpjars).
204    * @throws IOException When determining the region count fails.
205    */
206   public static void initTableReduceJob(String table,
207     Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
208     boolean addDependencyJars) throws IOException {
209     job.setOutputFormat(TableOutputFormat.class);
210     job.setReducerClass(reducer);
211     job.set(TableOutputFormat.OUTPUT_TABLE, table);
212     job.setOutputKeyClass(ImmutableBytesWritable.class);
213     job.setOutputValueClass(Put.class);
214     job.setStrings("io.serializations", job.get("io.serializations"),
215         MutationSerialization.class.getName(), ResultSerialization.class.getName());
216     if (partitioner == HRegionPartitioner.class) {
217       job.setPartitionerClass(HRegionPartitioner.class);
218       int regions =
219         MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
220       if (job.getNumReduceTasks() > regions) {
221         job.setNumReduceTasks(regions);
222       }
223     } else if (partitioner != null) {
224       job.setPartitionerClass(partitioner);
225     }
226     if (addDependencyJars) {
227       addDependencyJars(job);
228     }
229     initCredentials(job);
230   }
231 
232   public static void initCredentials(JobConf job) throws IOException {
233     UserProvider userProvider = UserProvider.instantiate(job);
234     if (userProvider.isHadoopSecurityEnabled()) {
235       // propagate delegation related props from launcher job to MR job
236       if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
237         job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
238       }
239     }
240 
241     if (userProvider.isHBaseSecurityEnabled()) {
242       Connection conn = ConnectionFactory.createConnection(job);
243       try {
244         // login the server principal (if using secure Hadoop)
245         User user = userProvider.getCurrent();
246         TokenUtil.addTokenForJob(conn, job, user);
247       } catch (InterruptedException ie) {
248         ie.printStackTrace();
249         Thread.currentThread().interrupt();
250       } finally {
251         conn.close();
252       }
253     }
254   }
255 
256   /**
257    * Ensures that the given number of reduce tasks for the given job
258    * configuration does not exceed the number of regions for the given table.
259    *
260    * @param table  The table to get the region count for.
261    * @param job  The current job configuration to adjust.
262    * @throws IOException When retrieving the table details fails.
263    */
264   // Used by tests.
265   public static void limitNumReduceTasks(String table, JobConf job)
266   throws IOException {
267     int regions =
268       MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
269     if (job.getNumReduceTasks() > regions)
270       job.setNumReduceTasks(regions);
271   }
272 
273   /**
274    * Ensures that the given number of map tasks for the given job
275    * configuration does not exceed the number of regions for the given table.
276    *
277    * @param table  The table to get the region count for.
278    * @param job  The current job configuration to adjust.
279    * @throws IOException When retrieving the table details fails.
280    */
281   // Used by tests.
282   public static void limitNumMapTasks(String table, JobConf job)
283   throws IOException {
284     int regions =
285       MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
286     if (job.getNumMapTasks() > regions)
287       job.setNumMapTasks(regions);
288   }
289 
290   /**
291    * Sets the number of reduce tasks for the given job configuration to the
292    * number of regions the given table has.
293    *
294    * @param table  The table to get the region count for.
295    * @param job  The current job configuration to adjust.
296    * @throws IOException When retrieving the table details fails.
297    */
298   public static void setNumReduceTasks(String table, JobConf job)
299   throws IOException {
300     job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
301       TableName.valueOf(table)));
302   }
303 
304   /**
305    * Sets the number of map tasks for the given job configuration to the
306    * number of regions the given table has.
307    *
308    * @param table  The table to get the region count for.
309    * @param job  The current job configuration to adjust.
310    * @throws IOException When retrieving the table details fails.
311    */
312   public static void setNumMapTasks(String table, JobConf job)
313   throws IOException {
314     job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
315       TableName.valueOf(table)));
316   }
317 
318   /**
319    * Sets the number of rows to return and cache with each scanner iteration.
320    * Higher caching values will enable faster mapreduce jobs at the expense of
321    * requiring more heap to contain the cached rows.
322    *
323    * @param job The current job configuration to adjust.
324    * @param batchSize The number of rows to return in batch with each scanner
325    * iteration.
326    */
327   public static void setScannerCaching(JobConf job, int batchSize) {
328     job.setInt("hbase.client.scanner.caching", batchSize);
329   }
330 
331   /**
332    * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
333    */
334   public static void addDependencyJars(JobConf job) throws IOException {
335     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
336     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
337       job,
338       // when making changes here, consider also mapreduce.TableMapReduceUtil
339       // pull job classes
340       job.getMapOutputKeyClass(),
341       job.getMapOutputValueClass(),
342       job.getOutputKeyClass(),
343       job.getOutputValueClass(),
344       job.getPartitionerClass(),
345       job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
346       job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
347       job.getCombinerClass());
348   }
349 }