View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.classification.InterfaceStability;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.MetaTableAccessor;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.client.Put;
31  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32  import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
33  import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
34  import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
35  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
36  import org.apache.hadoop.hbase.security.User;
37  import org.apache.hadoop.hbase.security.UserProvider;
38  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
39  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
40  import org.apache.hadoop.io.Text;
41  import org.apache.hadoop.mapred.FileInputFormat;
42  import org.apache.hadoop.mapred.InputFormat;
43  import org.apache.hadoop.mapred.JobConf;
44  import org.apache.hadoop.mapred.OutputFormat;
45  import org.apache.hadoop.mapred.TextInputFormat;
46  import org.apache.hadoop.mapred.TextOutputFormat;
47  import org.apache.hadoop.security.token.Token;
48  import org.apache.zookeeper.KeeperException;
49  
50  /**
51   * Utility for {@link TableMap} and {@link TableReduce}
52   */
53  @InterfaceAudience.Public
54  @InterfaceStability.Stable
55  @SuppressWarnings({ "rawtypes", "unchecked" })
56  public class TableMapReduceUtil {
57  
58    /**
59     * Use this before submitting a TableMap job. It will
60     * appropriately set up the JobConf.
61     *
62     * @param table  The table name to read from.
63     * @param columns  The columns to scan.
64     * @param mapper  The mapper class to use.
65     * @param outputKeyClass  The class of the output key.
66     * @param outputValueClass  The class of the output value.
67     * @param job  The current job configuration to adjust.
68     */
69    public static void initTableMapJob(String table, String columns,
70      Class<? extends TableMap> mapper,
71      Class<?> outputKeyClass,
72      Class<?> outputValueClass, JobConf job) {
73      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
74        true, TableInputFormat.class);
75    }
76  
77    public static void initTableMapJob(String table, String columns,
78      Class<? extends TableMap> mapper,
79      Class<?> outputKeyClass,
80      Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
81      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
82        addDependencyJars, TableInputFormat.class);
83    }
84  
85    /**
86     * Use this before submitting a TableMap job. It will
87     * appropriately set up the JobConf.
88     *
89     * @param table  The table name to read from.
90     * @param columns  The columns to scan.
91     * @param mapper  The mapper class to use.
92     * @param outputKeyClass  The class of the output key.
93     * @param outputValueClass  The class of the output value.
94     * @param job  The current job configuration to adjust.
95     * @param addDependencyJars upload HBase jars and jars for any of the configured
96     *           job classes via the distributed cache (tmpjars).
97     */
98    public static void initTableMapJob(String table, String columns,
99      Class<? extends TableMap> mapper,
100     Class<?> outputKeyClass,
101     Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
102     Class<? extends InputFormat> inputFormat) {
103 
104     job.setInputFormat(inputFormat);
105     job.setMapOutputValueClass(outputValueClass);
106     job.setMapOutputKeyClass(outputKeyClass);
107     job.setMapperClass(mapper);
108     job.setStrings("io.serializations", job.get("io.serializations"),
109         MutationSerialization.class.getName(), ResultSerialization.class.getName());
110     FileInputFormat.addInputPaths(job, table);
111     job.set(TableInputFormat.COLUMN_LIST, columns);
112     if (addDependencyJars) {
113       try {
114         addDependencyJars(job);
115       } catch (IOException e) {
116         e.printStackTrace();
117       }
118     }
119     try {
120       initCredentials(job);
121     } catch (IOException ioe) {
122       // just spit out the stack trace?  really?
123       ioe.printStackTrace();
124     }
125   }
126 
127   /**
128    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
129    * and read directly from snapshot files.
130    *
131    * @param snapshotName The name of the snapshot (of a table) to read from.
132    * @param columns  The columns to scan.
133    * @param mapper  The mapper class to use.
134    * @param outputKeyClass  The class of the output key.
135    * @param outputValueClass  The class of the output value.
136    * @param job  The current job to adjust.  Make sure the passed job is
137    * carrying all necessary HBase configuration.
138    * @param addDependencyJars upload HBase jars and jars for any of the configured
139    *           job classes via the distributed cache (tmpjars).
140    * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
141    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
142    * After the job is finished, restore directory can be deleted.
143    * @throws IOException When setting up the details fails.
144    * @see TableSnapshotInputFormat
145    */
146   public static void initTableSnapshotMapJob(String snapshotName, String columns,
147       Class<? extends TableMap> mapper,
148       Class<?> outputKeyClass,
149       Class<?> outputValueClass, JobConf job,
150       boolean addDependencyJars, Path tmpRestoreDir)
151   throws IOException {
152     TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
153     initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
154       addDependencyJars, TableSnapshotInputFormat.class);
155     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
156   }
157 
158   /**
159    * Use this before submitting a TableReduce job. It will
160    * appropriately set up the JobConf.
161    *
162    * @param table  The output table.
163    * @param reducer  The reducer class to use.
164    * @param job  The current job configuration to adjust.
165    * @throws IOException When determining the region count fails.
166    */
167   public static void initTableReduceJob(String table,
168     Class<? extends TableReduce> reducer, JobConf job)
169   throws IOException {
170     initTableReduceJob(table, reducer, job, null);
171   }
172 
173   /**
174    * Use this before submitting a TableReduce job. It will
175    * appropriately set up the JobConf.
176    *
177    * @param table  The output table.
178    * @param reducer  The reducer class to use.
179    * @param job  The current job configuration to adjust.
180    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
181    * default partitioner.
182    * @throws IOException When determining the region count fails.
183    */
184   public static void initTableReduceJob(String table,
185     Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
186   throws IOException {
187     initTableReduceJob(table, reducer, job, partitioner, true);
188   }
189 
190   /**
191    * Use this before submitting a TableReduce job. It will
192    * appropriately set up the JobConf.
193    *
194    * @param table  The output table.
195    * @param reducer  The reducer class to use.
196    * @param job  The current job configuration to adjust.
197    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
198    * default partitioner.
199    * @param addDependencyJars upload HBase jars and jars for any of the configured
200    *           job classes via the distributed cache (tmpjars).
201    * @throws IOException When determining the region count fails.
202    */
203   public static void initTableReduceJob(String table,
204     Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
205     boolean addDependencyJars) throws IOException {
206     job.setOutputFormat(TableOutputFormat.class);
207     job.setReducerClass(reducer);
208     job.set(TableOutputFormat.OUTPUT_TABLE, table);
209     job.setOutputKeyClass(ImmutableBytesWritable.class);
210     job.setOutputValueClass(Put.class);
211     job.setStrings("io.serializations", job.get("io.serializations"),
212         MutationSerialization.class.getName(), ResultSerialization.class.getName());
213     if (partitioner == HRegionPartitioner.class) {
214       job.setPartitionerClass(HRegionPartitioner.class);
215       int regions =
216         MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
217       if (job.getNumReduceTasks() > regions) {
218         job.setNumReduceTasks(regions);
219       }
220     } else if (partitioner != null) {
221       job.setPartitionerClass(partitioner);
222     }
223     if (addDependencyJars) {
224       addDependencyJars(job);
225     }
226     initCredentials(job);
227   }
228 
229   public static void initCredentials(JobConf job) throws IOException {
230     UserProvider userProvider = UserProvider.instantiate(job);
231     if (userProvider.isHadoopSecurityEnabled()) {
232       // propagate delegation related props from launcher job to MR job
233       if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
234         job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
235       }
236     }
237 
238     if (userProvider.isHBaseSecurityEnabled()) {
239       try {
240         // login the server principal (if using secure Hadoop)
241         User user = userProvider.getCurrent();
242         Token<AuthenticationTokenIdentifier> authToken = getAuthToken(job, user);
243         if (authToken == null) {
244           user.obtainAuthTokenForJob(job);
245         } else {
246           job.getCredentials().addToken(authToken.getService(), authToken);
247         }
248       } catch (InterruptedException ie) {
249         ie.printStackTrace();
250         Thread.currentThread().interrupt();
251       }
252     }
253   }
254 
255   /**
256    * Get the authentication token of the user for the cluster specified in the configuration
257    * @return null if the user does not have the token, otherwise the auth token for the cluster.
258    */
259   private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
260       throws IOException, InterruptedException {
261     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
262     try {
263       String clusterId = ZKClusterId.readClusterIdZNode(zkw);
264       return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
265     } catch (KeeperException e) {
266       throw new IOException(e);
267     } finally {
268       zkw.close();
269     }
270   }
271 
272   /**
273    * Ensures that the given number of reduce tasks for the given job
274    * configuration does not exceed the number of regions for the given table.
275    *
276    * @param table  The table to get the region count for.
277    * @param job  The current job configuration to adjust.
278    * @throws IOException When retrieving the table details fails.
279    */
280   // Used by tests.
281   public static void limitNumReduceTasks(String table, JobConf job)
282   throws IOException {
283     int regions =
284       MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
285     if (job.getNumReduceTasks() > regions)
286       job.setNumReduceTasks(regions);
287   }
288 
289   /**
290    * Ensures that the given number of map tasks for the given job
291    * configuration does not exceed the number of regions for the given table.
292    *
293    * @param table  The table to get the region count for.
294    * @param job  The current job configuration to adjust.
295    * @throws IOException When retrieving the table details fails.
296    */
297   // Used by tests.
298   public static void limitNumMapTasks(String table, JobConf job)
299   throws IOException {
300     int regions =
301       MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
302     if (job.getNumMapTasks() > regions)
303       job.setNumMapTasks(regions);
304   }
305 
306   /**
307    * Sets the number of reduce tasks for the given job configuration to the
308    * number of regions the given table has.
309    *
310    * @param table  The table to get the region count for.
311    * @param job  The current job configuration to adjust.
312    * @throws IOException When retrieving the table details fails.
313    */
314   public static void setNumReduceTasks(String table, JobConf job)
315   throws IOException {
316     job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
317       TableName.valueOf(table)));
318   }
319 
320   /**
321    * Sets the number of map tasks for the given job configuration to the
322    * number of regions the given table has.
323    *
324    * @param table  The table to get the region count for.
325    * @param job  The current job configuration to adjust.
326    * @throws IOException When retrieving the table details fails.
327    */
328   public static void setNumMapTasks(String table, JobConf job)
329   throws IOException {
330     job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
331       TableName.valueOf(table)));
332   }
333 
334   /**
335    * Sets the number of rows to return and cache with each scanner iteration.
336    * Higher caching values will enable faster mapreduce jobs at the expense of
337    * requiring more heap to contain the cached rows.
338    *
339    * @param job The current job configuration to adjust.
340    * @param batchSize The number of rows to return in batch with each scanner
341    * iteration.
342    */
343   public static void setScannerCaching(JobConf job, int batchSize) {
344     job.setInt("hbase.client.scanner.caching", batchSize);
345   }
346 
347   /**
348    * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
349    */
350   public static void addDependencyJars(JobConf job) throws IOException {
351     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
352     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
353       job,
354       // when making changes here, consider also mapreduce.TableMapReduceUtil
355       // pull job classes
356       job.getMapOutputKeyClass(),
357       job.getMapOutputValueClass(),
358       job.getOutputKeyClass(),
359       job.getOutputValueClass(),
360       job.getPartitionerClass(),
361       job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
362       job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
363       job.getCombinerClass());
364   }
365 }