View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.classification.InterfaceStability;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.MetaTableAccessor;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31  import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
32  import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
33  import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
34  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
35  import org.apache.hadoop.hbase.security.User;
36  import org.apache.hadoop.hbase.security.UserProvider;
37  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
38  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
39  import org.apache.hadoop.io.Text;
40  import org.apache.hadoop.mapred.FileInputFormat;
41  import org.apache.hadoop.mapred.InputFormat;
42  import org.apache.hadoop.mapred.JobConf;
43  import org.apache.hadoop.mapred.OutputFormat;
44  import org.apache.hadoop.mapred.TextInputFormat;
45  import org.apache.hadoop.mapred.TextOutputFormat;
46  import org.apache.hadoop.security.token.Token;
47  import org.apache.zookeeper.KeeperException;
48  
49  /**
50   * Utility for {@link TableMap} and {@link TableReduce}
51   */
52  @InterfaceAudience.Public
53  @InterfaceStability.Stable
54  @SuppressWarnings({ "rawtypes", "unchecked" })
55  public class TableMapReduceUtil {
56  
57    /**
58     * Use this before submitting a TableMap job. It will
59     * appropriately set up the JobConf.
60     *
61     * @param table  The table name to read from.
62     * @param columns  The columns to scan.
63     * @param mapper  The mapper class to use.
64     * @param outputKeyClass  The class of the output key.
65     * @param outputValueClass  The class of the output value.
66     * @param job  The current job configuration to adjust.
67     */
68    public static void initTableMapJob(String table, String columns,
69      Class<? extends TableMap> mapper,
70      Class<?> outputKeyClass,
71      Class<?> outputValueClass, JobConf job) {
72      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
73        true, TableInputFormat.class);
74    }
75  
76    public static void initTableMapJob(String table, String columns,
77      Class<? extends TableMap> mapper,
78      Class<?> outputKeyClass,
79      Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
80      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
81        addDependencyJars, TableInputFormat.class);
82    }
83  
84    /**
85     * Use this before submitting a TableMap job. It will
86     * appropriately set up the JobConf.
87     *
88     * @param table  The table name to read from.
89     * @param columns  The columns to scan.
90     * @param mapper  The mapper class to use.
91     * @param outputKeyClass  The class of the output key.
92     * @param outputValueClass  The class of the output value.
93     * @param job  The current job configuration to adjust.
94     * @param addDependencyJars upload HBase jars and jars for any of the configured
95     *           job classes via the distributed cache (tmpjars).
96     */
97    public static void initTableMapJob(String table, String columns,
98      Class<? extends TableMap> mapper,
99      Class<?> outputKeyClass,
100     Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
101     Class<? extends InputFormat> inputFormat) {
102 
103     job.setInputFormat(inputFormat);
104     job.setMapOutputValueClass(outputValueClass);
105     job.setMapOutputKeyClass(outputKeyClass);
106     job.setMapperClass(mapper);
107     job.setStrings("io.serializations", job.get("io.serializations"),
108         MutationSerialization.class.getName(), ResultSerialization.class.getName());
109     FileInputFormat.addInputPaths(job, table);
110     job.set(TableInputFormat.COLUMN_LIST, columns);
111     if (addDependencyJars) {
112       try {
113         addDependencyJars(job);
114       } catch (IOException e) {
115         e.printStackTrace();
116       }
117     }
118     try {
119       initCredentials(job);
120     } catch (IOException ioe) {
121       // just spit out the stack trace?  really?
122       ioe.printStackTrace();
123     }
124   }
125 
126   /**
127    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
128    * and read directly from snapshot files.
129    *
130    * @param snapshotName The name of the snapshot (of a table) to read from.
131    * @param columns  The columns to scan.
132    * @param mapper  The mapper class to use.
133    * @param outputKeyClass  The class of the output key.
134    * @param outputValueClass  The class of the output value.
135    * @param job  The current job to adjust.  Make sure the passed job is
136    * carrying all necessary HBase configuration.
137    * @param addDependencyJars upload HBase jars and jars for any of the configured
138    *           job classes via the distributed cache (tmpjars).
139    * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
140    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
141    * After the job is finished, restore directory can be deleted.
142    * @throws IOException When setting up the details fails.
143    * @see TableSnapshotInputFormat
144    */
145   public static void initTableSnapshotMapJob(String snapshotName, String columns,
146       Class<? extends TableMap> mapper,
147       Class<?> outputKeyClass,
148       Class<?> outputValueClass, JobConf job,
149       boolean addDependencyJars, Path tmpRestoreDir)
150   throws IOException {
151     TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
152     initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
153       addDependencyJars, TableSnapshotInputFormat.class);
154     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
155   }
156 
157   /**
158    * Use this before submitting a TableReduce job. It will
159    * appropriately set up the JobConf.
160    *
161    * @param table  The output table.
162    * @param reducer  The reducer class to use.
163    * @param job  The current job configuration to adjust.
164    * @throws IOException When determining the region count fails.
165    */
166   public static void initTableReduceJob(String table,
167     Class<? extends TableReduce> reducer, JobConf job)
168   throws IOException {
169     initTableReduceJob(table, reducer, job, null);
170   }
171 
172   /**
173    * Use this before submitting a TableReduce job. It will
174    * appropriately set up the JobConf.
175    *
176    * @param table  The output table.
177    * @param reducer  The reducer class to use.
178    * @param job  The current job configuration to adjust.
179    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
180    * default partitioner.
181    * @throws IOException When determining the region count fails.
182    */
183   public static void initTableReduceJob(String table,
184     Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
185   throws IOException {
186     initTableReduceJob(table, reducer, job, partitioner, true);
187   }
188 
189   /**
190    * Use this before submitting a TableReduce job. It will
191    * appropriately set up the JobConf.
192    *
193    * @param table  The output table.
194    * @param reducer  The reducer class to use.
195    * @param job  The current job configuration to adjust.
196    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
197    * default partitioner.
198    * @param addDependencyJars upload HBase jars and jars for any of the configured
199    *           job classes via the distributed cache (tmpjars).
200    * @throws IOException When determining the region count fails.
201    */
202   public static void initTableReduceJob(String table,
203     Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
204     boolean addDependencyJars) throws IOException {
205     job.setOutputFormat(TableOutputFormat.class);
206     job.setReducerClass(reducer);
207     job.set(TableOutputFormat.OUTPUT_TABLE, table);
208     job.setOutputKeyClass(ImmutableBytesWritable.class);
209     job.setOutputValueClass(Put.class);
210     job.setStrings("io.serializations", job.get("io.serializations"),
211         MutationSerialization.class.getName(), ResultSerialization.class.getName());
212     if (partitioner == HRegionPartitioner.class) {
213       job.setPartitionerClass(HRegionPartitioner.class);
214       int regions = MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), table);
215       if (job.getNumReduceTasks() > regions) {
216         job.setNumReduceTasks(regions);
217       }
218     } else if (partitioner != null) {
219       job.setPartitionerClass(partitioner);
220     }
221     if (addDependencyJars) {
222       addDependencyJars(job);
223     }
224     initCredentials(job);
225   }
226 
227   public static void initCredentials(JobConf job) throws IOException {
228     UserProvider userProvider = UserProvider.instantiate(job);
229     if (userProvider.isHadoopSecurityEnabled()) {
230       // propagate delegation related props from launcher job to MR job
231       if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
232         job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
233       }
234     }
235 
236     if (userProvider.isHBaseSecurityEnabled()) {
237       try {
238         // login the server principal (if using secure Hadoop)
239         User user = userProvider.getCurrent();
240         Token<AuthenticationTokenIdentifier> authToken = getAuthToken(job, user);
241         if (authToken == null) {
242           user.obtainAuthTokenForJob(job);
243         } else {
244           job.getCredentials().addToken(authToken.getService(), authToken);
245         }
246       } catch (InterruptedException ie) {
247         ie.printStackTrace();
248         Thread.currentThread().interrupt();
249       }
250     }
251   }
252 
253   /**
254    * Get the authentication token of the user for the cluster specified in the configuration
255    * @return null if the user does not have the token, otherwise the auth token for the cluster.
256    */
257   private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
258       throws IOException, InterruptedException {
259     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
260     try {
261       String clusterId = ZKClusterId.readClusterIdZNode(zkw);
262       return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
263     } catch (KeeperException e) {
264       throw new IOException(e);
265     } finally {
266       zkw.close();
267     }
268   }
269 
270   /**
271    * Ensures that the given number of reduce tasks for the given job
272    * configuration does not exceed the number of regions for the given table.
273    *
274    * @param table  The table to get the region count for.
275    * @param job  The current job configuration to adjust.
276    * @throws IOException When retrieving the table details fails.
277    */
278   public static void limitNumReduceTasks(String table, JobConf job)
279   throws IOException {
280     int regions = MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), table);
281     if (job.getNumReduceTasks() > regions)
282       job.setNumReduceTasks(regions);
283   }
284 
285   /**
286    * Ensures that the given number of map tasks for the given job
287    * configuration does not exceed the number of regions for the given table.
288    *
289    * @param table  The table to get the region count for.
290    * @param job  The current job configuration to adjust.
291    * @throws IOException When retrieving the table details fails.
292    */
293   public static void limitNumMapTasks(String table, JobConf job)
294   throws IOException {
295     int regions = MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), table);
296     if (job.getNumMapTasks() > regions)
297       job.setNumMapTasks(regions);
298   }
299 
300   /**
301    * Sets the number of reduce tasks for the given job configuration to the
302    * number of regions the given table has.
303    *
304    * @param table  The table to get the region count for.
305    * @param job  The current job configuration to adjust.
306    * @throws IOException When retrieving the table details fails.
307    */
308   public static void setNumReduceTasks(String table, JobConf job)
309   throws IOException {
310     job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), table));
311   }
312 
313   /**
314    * Sets the number of map tasks for the given job configuration to the
315    * number of regions the given table has.
316    *
317    * @param table  The table to get the region count for.
318    * @param job  The current job configuration to adjust.
319    * @throws IOException When retrieving the table details fails.
320    */
321   public static void setNumMapTasks(String table, JobConf job)
322   throws IOException {
323     job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), table));
324   }
325 
326   /**
327    * Sets the number of rows to return and cache with each scanner iteration.
328    * Higher caching values will enable faster mapreduce jobs at the expense of
329    * requiring more heap to contain the cached rows.
330    *
331    * @param job The current job configuration to adjust.
332    * @param batchSize The number of rows to return in batch with each scanner
333    * iteration.
334    */
335   public static void setScannerCaching(JobConf job, int batchSize) {
336     job.setInt("hbase.client.scanner.caching", batchSize);
337   }
338 
339   /**
340    * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
341    */
342   public static void addDependencyJars(JobConf job) throws IOException {
343     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
344     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
345       job,
346       // when making changes here, consider also mapreduce.TableMapReduceUtil
347       // pull job classes
348       job.getMapOutputKeyClass(),
349       job.getMapOutputValueClass(),
350       job.getOutputKeyClass(),
351       job.getOutputValueClass(),
352       job.getPartitionerClass(),
353       job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
354       job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
355       job.getCombinerClass());
356   }
357 }