View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.classification.InterfaceAudience;
25  import org.apache.hadoop.classification.InterfaceStability;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.catalog.MetaReader;
28  import org.apache.hadoop.hbase.client.Put;
29  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30  import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
31  import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
32  import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
33  import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
34  import org.apache.hadoop.hbase.security.User;
35  import org.apache.hadoop.hbase.security.UserProvider;
36  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
37  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
38  import org.apache.hadoop.io.Text;
39  import org.apache.hadoop.mapred.FileInputFormat;
40  import org.apache.hadoop.mapred.InputFormat;
41  import org.apache.hadoop.mapred.JobConf;
42  import org.apache.hadoop.mapred.OutputFormat;
43  import org.apache.hadoop.mapred.TextInputFormat;
44  import org.apache.hadoop.mapred.TextOutputFormat;
45  import org.apache.hadoop.security.token.Token;
46  import org.apache.zookeeper.KeeperException;
47  
48  /**
49   * Utility for {@link TableMap} and {@link TableReduce}
50   */
51  @Deprecated
52  @InterfaceAudience.Public
53  @InterfaceStability.Stable
54  @SuppressWarnings({ "rawtypes", "unchecked" })
55  public class TableMapReduceUtil {
56  
57    /**
58     * Use this before submitting a TableMap job. It will
59     * appropriately set up the JobConf.
60     *
61     * @param table  The table name to read from.
62     * @param columns  The columns to scan.
63     * @param mapper  The mapper class to use.
64     * @param outputKeyClass  The class of the output key.
65     * @param outputValueClass  The class of the output value.
66     * @param job  The current job configuration to adjust.
67     */
68    public static void initTableMapJob(String table, String columns,
69      Class<? extends TableMap> mapper,
70      Class<?> outputKeyClass,
71      Class<?> outputValueClass, JobConf job) {
72      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, true);
73    }
74  
75    /**
76     * Use this before submitting a TableMap job. It will
77     * appropriately set up the JobConf.
78     *
79     * @param table  The table name to read from.
80     * @param columns  The columns to scan.
81     * @param mapper  The mapper class to use.
82     * @param outputKeyClass  The class of the output key.
83     * @param outputValueClass  The class of the output value.
84     * @param job  The current job configuration to adjust.
85     * @param addDependencyJars upload HBase jars and jars for any of the configured
86     *           job classes via the distributed cache (tmpjars).
87     */
88    public static void initTableMapJob(String table, String columns,
89      Class<? extends TableMap> mapper,
90      Class<?> outputKeyClass,
91      Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
92  
93      job.setInputFormat(TableInputFormat.class);
94      job.setMapOutputValueClass(outputValueClass);
95      job.setMapOutputKeyClass(outputKeyClass);
96      job.setMapperClass(mapper);
97      job.setStrings("io.serializations", job.get("io.serializations"),
98          MutationSerialization.class.getName(), ResultSerialization.class.getName());
99      FileInputFormat.addInputPaths(job, table);
100     job.set(TableInputFormat.COLUMN_LIST, columns);
101     if (addDependencyJars) {
102       try {
103         addDependencyJars(job);
104       } catch (IOException e) {
105         e.printStackTrace();
106       }
107     }
108     try {
109       initCredentials(job);
110     } catch (IOException ioe) {
111       // just spit out the stack trace?  really?
112       ioe.printStackTrace();
113     }
114   }
115 
116   /**
117    * Use this before submitting a TableReduce job. It will
118    * appropriately set up the JobConf.
119    *
120    * @param table  The output table.
121    * @param reducer  The reducer class to use.
122    * @param job  The current job configuration to adjust.
123    * @throws IOException When determining the region count fails.
124    */
125   public static void initTableReduceJob(String table,
126     Class<? extends TableReduce> reducer, JobConf job)
127   throws IOException {
128     initTableReduceJob(table, reducer, job, null);
129   }
130 
131   /**
132    * Use this before submitting a TableReduce job. It will
133    * appropriately set up the JobConf.
134    *
135    * @param table  The output table.
136    * @param reducer  The reducer class to use.
137    * @param job  The current job configuration to adjust.
138    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
139    * default partitioner.
140    * @throws IOException When determining the region count fails.
141    */
142   public static void initTableReduceJob(String table,
143     Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
144   throws IOException {
145     initTableReduceJob(table, reducer, job, partitioner, true);
146   }
147 
148   /**
149    * Use this before submitting a TableReduce job. It will
150    * appropriately set up the JobConf.
151    *
152    * @param table  The output table.
153    * @param reducer  The reducer class to use.
154    * @param job  The current job configuration to adjust.
155    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
156    * default partitioner.
157    * @param addDependencyJars upload HBase jars and jars for any of the configured
158    *           job classes via the distributed cache (tmpjars).
159    * @throws IOException When determining the region count fails.
160    */
161   public static void initTableReduceJob(String table,
162     Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
163     boolean addDependencyJars) throws IOException {
164     job.setOutputFormat(TableOutputFormat.class);
165     job.setReducerClass(reducer);
166     job.set(TableOutputFormat.OUTPUT_TABLE, table);
167     job.setOutputKeyClass(ImmutableBytesWritable.class);
168     job.setOutputValueClass(Put.class);
169     job.setStrings("io.serializations", job.get("io.serializations"),
170         MutationSerialization.class.getName(), ResultSerialization.class.getName());
171     if (partitioner == HRegionPartitioner.class) {
172       job.setPartitionerClass(HRegionPartitioner.class);
173       int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
174       if (job.getNumReduceTasks() > regions) {
175         job.setNumReduceTasks(regions);
176       }
177     } else if (partitioner != null) {
178       job.setPartitionerClass(partitioner);
179     }
180     if (addDependencyJars) {
181       addDependencyJars(job);
182     }
183     initCredentials(job);
184   }
185 
186   public static void initCredentials(JobConf job) throws IOException {
187     UserProvider userProvider = UserProvider.instantiate(job);
188     if (userProvider.isHadoopSecurityEnabled()) {
189       // propagate delegation related props from launcher job to MR job
190       if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
191         job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
192       }
193     }
194 
195     if (userProvider.isHBaseSecurityEnabled()) {
196       try {
197         // login the server principal (if using secure Hadoop)
198         User user = userProvider.getCurrent();
199         Token<AuthenticationTokenIdentifier> authToken = getAuthToken(job, user);
200         if (authToken == null) {
201           user.obtainAuthTokenForJob(job);
202         } else {
203           job.getCredentials().addToken(authToken.getService(), authToken);
204         }
205       } catch (InterruptedException ie) {
206         ie.printStackTrace();
207         Thread.currentThread().interrupt();
208       }
209     }
210   }
211 
212   /**
213    * Get the authentication token of the user for the cluster specified in the configuration
214    * @return null if the user does not have the token, otherwise the auth token for the cluster.
215    */
216   private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
217       throws IOException, InterruptedException {
218     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
219     try {
220       String clusterId = ZKClusterId.readClusterIdZNode(zkw);
221       return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
222     } catch (KeeperException e) {
223       throw new IOException(e);
224     } finally {
225       zkw.close();
226     }
227   }
228 
229   /**
230    * Ensures that the given number of reduce tasks for the given job
231    * configuration does not exceed the number of regions for the given table.
232    *
233    * @param table  The table to get the region count for.
234    * @param job  The current job configuration to adjust.
235    * @throws IOException When retrieving the table details fails.
236    */
237   public static void limitNumReduceTasks(String table, JobConf job)
238   throws IOException {
239     int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
240     if (job.getNumReduceTasks() > regions)
241       job.setNumReduceTasks(regions);
242   }
243 
244   /**
245    * Ensures that the given number of map tasks for the given job
246    * configuration does not exceed the number of regions for the given table.
247    *
248    * @param table  The table to get the region count for.
249    * @param job  The current job configuration to adjust.
250    * @throws IOException When retrieving the table details fails.
251    */
252   public static void limitNumMapTasks(String table, JobConf job)
253   throws IOException {
254     int regions = MetaReader.getRegionCount(HBaseConfiguration.create(job), table);
255     if (job.getNumMapTasks() > regions)
256       job.setNumMapTasks(regions);
257   }
258 
259   /**
260    * Sets the number of reduce tasks for the given job configuration to the
261    * number of regions the given table has.
262    *
263    * @param table  The table to get the region count for.
264    * @param job  The current job configuration to adjust.
265    * @throws IOException When retrieving the table details fails.
266    */
267   public static void setNumReduceTasks(String table, JobConf job)
268   throws IOException {
269     job.setNumReduceTasks(MetaReader.getRegionCount(HBaseConfiguration.create(job), table));
270   }
271 
272   /**
273    * Sets the number of map tasks for the given job configuration to the
274    * number of regions the given table has.
275    *
276    * @param table  The table to get the region count for.
277    * @param job  The current job configuration to adjust.
278    * @throws IOException When retrieving the table details fails.
279    */
280   public static void setNumMapTasks(String table, JobConf job)
281   throws IOException {
282     job.setNumMapTasks(MetaReader.getRegionCount(HBaseConfiguration.create(job), table));
283   }
284 
285   /**
286    * Sets the number of rows to return and cache with each scanner iteration.
287    * Higher caching values will enable faster mapreduce jobs at the expense of
288    * requiring more heap to contain the cached rows.
289    *
290    * @param job The current job configuration to adjust.
291    * @param batchSize The number of rows to return in batch with each scanner
292    * iteration.
293    */
294   public static void setScannerCaching(JobConf job, int batchSize) {
295     job.setInt("hbase.client.scanner.caching", batchSize);
296   }
297 
298   /**
299    * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
300    */
301   public static void addDependencyJars(JobConf job) throws IOException {
302     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
303     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
304       job,
305       // when making changes here, consider also mapreduce.TableMapReduceUtil
306       // pull job classes
307       job.getMapOutputKeyClass(),
308       job.getMapOutputValueClass(),
309       job.getOutputKeyClass(),
310       job.getOutputValueClass(),
311       job.getPartitionerClass(),
312       job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
313       job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
314       job.getCombinerClass());
315   }
316 }