View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.classification.InterfaceStability;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.MetaTableAccessor;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31  import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
32  import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
33  import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
34  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
35  import org.apache.hadoop.hbase.security.User;
36  import org.apache.hadoop.hbase.security.UserProvider;
37  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
38  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
39  import org.apache.hadoop.io.Text;
40  import org.apache.hadoop.mapred.FileInputFormat;
41  import org.apache.hadoop.mapred.InputFormat;
42  import org.apache.hadoop.mapred.JobConf;
43  import org.apache.hadoop.mapred.OutputFormat;
44  import org.apache.hadoop.mapred.TextInputFormat;
45  import org.apache.hadoop.mapred.TextOutputFormat;
46  import org.apache.hadoop.security.token.Token;
47  import org.apache.zookeeper.KeeperException;
48  
49  /**
50   * Utility for {@link TableMap} and {@link TableReduce}
51   */
52  @Deprecated
53  @InterfaceAudience.Public
54  @InterfaceStability.Stable
55  @SuppressWarnings({ "rawtypes", "unchecked" })
56  public class TableMapReduceUtil {
57  
58    /**
59     * Use this before submitting a TableMap job. It will
60     * appropriately set up the JobConf.
61     *
62     * @param table  The table name to read from.
63     * @param columns  The columns to scan.
64     * @param mapper  The mapper class to use.
65     * @param outputKeyClass  The class of the output key.
66     * @param outputValueClass  The class of the output value.
67     * @param job  The current job configuration to adjust.
68     */
69    public static void initTableMapJob(String table, String columns,
70      Class<? extends TableMap> mapper,
71      Class<?> outputKeyClass,
72      Class<?> outputValueClass, JobConf job) {
73      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
74        true, TableInputFormat.class);
75    }
76  
77    public static void initTableMapJob(String table, String columns,
78      Class<? extends TableMap> mapper,
79      Class<?> outputKeyClass,
80      Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
81      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
82        addDependencyJars, TableInputFormat.class);
83    }
84  
85    /**
86     * Use this before submitting a TableMap job. It will
87     * appropriately set up the JobConf.
88     *
89     * @param table  The table name to read from.
90     * @param columns  The columns to scan.
91     * @param mapper  The mapper class to use.
92     * @param outputKeyClass  The class of the output key.
93     * @param outputValueClass  The class of the output value.
94     * @param job  The current job configuration to adjust.
95     * @param addDependencyJars upload HBase jars and jars for any of the configured
96     *           job classes via the distributed cache (tmpjars).
97     */
98    public static void initTableMapJob(String table, String columns,
99      Class<? extends TableMap> mapper,
100     Class<?> outputKeyClass,
101     Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
102     Class<? extends InputFormat> inputFormat) {
103 
104     job.setInputFormat(inputFormat);
105     job.setMapOutputValueClass(outputValueClass);
106     job.setMapOutputKeyClass(outputKeyClass);
107     job.setMapperClass(mapper);
108     job.setStrings("io.serializations", job.get("io.serializations"),
109         MutationSerialization.class.getName(), ResultSerialization.class.getName());
110     FileInputFormat.addInputPaths(job, table);
111     job.set(TableInputFormat.COLUMN_LIST, columns);
112     if (addDependencyJars) {
113       try {
114         addDependencyJars(job);
115       } catch (IOException e) {
116         e.printStackTrace();
117       }
118     }
119     try {
120       initCredentials(job);
121     } catch (IOException ioe) {
122       // just spit out the stack trace?  really?
123       ioe.printStackTrace();
124     }
125   }
126 
127   /**
128    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
129    * and read directly from snapshot files.
130    *
131    * @param snapshotName The name of the snapshot (of a table) to read from.
132    * @param columns  The columns to scan.
133    * @param mapper  The mapper class to use.
134    * @param outputKeyClass  The class of the output key.
135    * @param outputValueClass  The class of the output value.
136    * @param job  The current job to adjust.  Make sure the passed job is
137    * carrying all necessary HBase configuration.
138    * @param addDependencyJars upload HBase jars and jars for any of the configured
139    *           job classes via the distributed cache (tmpjars).
140    * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
141    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
142    * After the job is finished, restore directory can be deleted.
143    * @throws IOException When setting up the details fails.
144    * @see TableSnapshotInputFormat
145    */
146   public static void initTableSnapshotMapJob(String snapshotName, String columns,
147       Class<? extends TableMap> mapper,
148       Class<?> outputKeyClass,
149       Class<?> outputValueClass, JobConf job,
150       boolean addDependencyJars, Path tmpRestoreDir)
151   throws IOException {
152     TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
153     initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
154       addDependencyJars, TableSnapshotInputFormat.class);
155     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
156   }
157 
158   /**
159    * Use this before submitting a TableReduce job. It will
160    * appropriately set up the JobConf.
161    *
162    * @param table  The output table.
163    * @param reducer  The reducer class to use.
164    * @param job  The current job configuration to adjust.
165    * @throws IOException When determining the region count fails.
166    */
167   public static void initTableReduceJob(String table,
168     Class<? extends TableReduce> reducer, JobConf job)
169   throws IOException {
170     initTableReduceJob(table, reducer, job, null);
171   }
172 
173   /**
174    * Use this before submitting a TableReduce job. It will
175    * appropriately set up the JobConf.
176    *
177    * @param table  The output table.
178    * @param reducer  The reducer class to use.
179    * @param job  The current job configuration to adjust.
180    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
181    * default partitioner.
182    * @throws IOException When determining the region count fails.
183    */
184   public static void initTableReduceJob(String table,
185     Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
186   throws IOException {
187     initTableReduceJob(table, reducer, job, partitioner, true);
188   }
189 
190   /**
191    * Use this before submitting a TableReduce job. It will
192    * appropriately set up the JobConf.
193    *
194    * @param table  The output table.
195    * @param reducer  The reducer class to use.
196    * @param job  The current job configuration to adjust.
197    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
198    * default partitioner.
199    * @param addDependencyJars upload HBase jars and jars for any of the configured
200    *           job classes via the distributed cache (tmpjars).
201    * @throws IOException When determining the region count fails.
202    */
203   public static void initTableReduceJob(String table,
204     Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
205     boolean addDependencyJars) throws IOException {
206     job.setOutputFormat(TableOutputFormat.class);
207     job.setReducerClass(reducer);
208     job.set(TableOutputFormat.OUTPUT_TABLE, table);
209     job.setOutputKeyClass(ImmutableBytesWritable.class);
210     job.setOutputValueClass(Put.class);
211     job.setStrings("io.serializations", job.get("io.serializations"),
212         MutationSerialization.class.getName(), ResultSerialization.class.getName());
213     if (partitioner == HRegionPartitioner.class) {
214       job.setPartitionerClass(HRegionPartitioner.class);
215       int regions = MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), table);
216       if (job.getNumReduceTasks() > regions) {
217         job.setNumReduceTasks(regions);
218       }
219     } else if (partitioner != null) {
220       job.setPartitionerClass(partitioner);
221     }
222     if (addDependencyJars) {
223       addDependencyJars(job);
224     }
225     initCredentials(job);
226   }
227 
228   public static void initCredentials(JobConf job) throws IOException {
229     UserProvider userProvider = UserProvider.instantiate(job);
230     if (userProvider.isHadoopSecurityEnabled()) {
231       // propagate delegation related props from launcher job to MR job
232       if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
233         job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
234       }
235     }
236 
237     if (userProvider.isHBaseSecurityEnabled()) {
238       try {
239         // login the server principal (if using secure Hadoop)
240         User user = userProvider.getCurrent();
241         Token<AuthenticationTokenIdentifier> authToken = getAuthToken(job, user);
242         if (authToken == null) {
243           user.obtainAuthTokenForJob(job);
244         } else {
245           job.getCredentials().addToken(authToken.getService(), authToken);
246         }
247       } catch (InterruptedException ie) {
248         ie.printStackTrace();
249         Thread.currentThread().interrupt();
250       }
251     }
252   }
253 
254   /**
255    * Get the authentication token of the user for the cluster specified in the configuration
256    * @return null if the user does not have the token, otherwise the auth token for the cluster.
257    */
258   private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
259       throws IOException, InterruptedException {
260     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
261     try {
262       String clusterId = ZKClusterId.readClusterIdZNode(zkw);
263       return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
264     } catch (KeeperException e) {
265       throw new IOException(e);
266     } finally {
267       zkw.close();
268     }
269   }
270 
271   /**
272    * Ensures that the given number of reduce tasks for the given job
273    * configuration does not exceed the number of regions for the given table.
274    *
275    * @param table  The table to get the region count for.
276    * @param job  The current job configuration to adjust.
277    * @throws IOException When retrieving the table details fails.
278    */
279   public static void limitNumReduceTasks(String table, JobConf job)
280   throws IOException {
281     int regions = MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), table);
282     if (job.getNumReduceTasks() > regions)
283       job.setNumReduceTasks(regions);
284   }
285 
286   /**
287    * Ensures that the given number of map tasks for the given job
288    * configuration does not exceed the number of regions for the given table.
289    *
290    * @param table  The table to get the region count for.
291    * @param job  The current job configuration to adjust.
292    * @throws IOException When retrieving the table details fails.
293    */
294   public static void limitNumMapTasks(String table, JobConf job)
295   throws IOException {
296     int regions = MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), table);
297     if (job.getNumMapTasks() > regions)
298       job.setNumMapTasks(regions);
299   }
300 
301   /**
302    * Sets the number of reduce tasks for the given job configuration to the
303    * number of regions the given table has.
304    *
305    * @param table  The table to get the region count for.
306    * @param job  The current job configuration to adjust.
307    * @throws IOException When retrieving the table details fails.
308    */
309   public static void setNumReduceTasks(String table, JobConf job)
310   throws IOException {
311     job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), table));
312   }
313 
314   /**
315    * Sets the number of map tasks for the given job configuration to the
316    * number of regions the given table has.
317    *
318    * @param table  The table to get the region count for.
319    * @param job  The current job configuration to adjust.
320    * @throws IOException When retrieving the table details fails.
321    */
322   public static void setNumMapTasks(String table, JobConf job)
323   throws IOException {
324     job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), table));
325   }
326 
327   /**
328    * Sets the number of rows to return and cache with each scanner iteration.
329    * Higher caching values will enable faster mapreduce jobs at the expense of
330    * requiring more heap to contain the cached rows.
331    *
332    * @param job The current job configuration to adjust.
333    * @param batchSize The number of rows to return in batch with each scanner
334    * iteration.
335    */
336   public static void setScannerCaching(JobConf job, int batchSize) {
337     job.setInt("hbase.client.scanner.caching", batchSize);
338   }
339 
340   /**
341    * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
342    */
343   public static void addDependencyJars(JobConf job) throws IOException {
344     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
345     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
346       job,
347       // when making changes here, consider also mapreduce.TableMapReduceUtil
348       // pull job classes
349       job.getMapOutputKeyClass(),
350       job.getMapOutputValueClass(),
351       job.getOutputKeyClass(),
352       job.getOutputValueClass(),
353       job.getPartitionerClass(),
354       job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
355       job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
356       job.getCombinerClass());
357   }
358 }