View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.net.URL;
24  import java.net.URLDecoder;
25  import java.util.*;
26  import java.util.zip.ZipEntry;
27  import java.util.zip.ZipFile;
28  
29  import com.google.protobuf.InvalidProtocolBufferException;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HBaseConfiguration;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.MetaTableAccessor;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.classification.InterfaceStability;
41  import org.apache.hadoop.hbase.client.Connection;
42  import org.apache.hadoop.hbase.client.ConnectionFactory;
43  import org.apache.hadoop.hbase.client.Put;
44  import org.apache.hadoop.hbase.client.Scan;
45  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
46  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
48  import org.apache.hadoop.hbase.security.User;
49  import org.apache.hadoop.hbase.security.UserProvider;
50  import org.apache.hadoop.hbase.security.token.TokenUtil;
51  import org.apache.hadoop.hbase.util.Base64;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.zookeeper.ZKConfig;
54  import org.apache.hadoop.io.Writable;
55  import org.apache.hadoop.mapreduce.InputFormat;
56  import org.apache.hadoop.mapreduce.Job;
57  import org.apache.hadoop.util.StringUtils;
58  
59  /**
60   * Utility for {@link TableMapper} and {@link TableReducer}
61   */
62  @SuppressWarnings({ "rawtypes", "unchecked" })
63  @InterfaceAudience.Public
64  @InterfaceStability.Stable
65  public class TableMapReduceUtil {
66    private static final Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
67  
68    /**
69     * Use this before submitting a TableMap job. It will appropriately set up
70     * the job.
71     *
72     * @param table  The table name to read from.
73     * @param scan  The scan instance with the columns, time range etc.
74     * @param mapper  The mapper class to use.
75     * @param outputKeyClass  The class of the output key.
76     * @param outputValueClass  The class of the output value.
77     * @param job  The current job to adjust.  Make sure the passed job is
78     * carrying all necessary HBase configuration.
79     * @throws IOException When setting up the details fails.
80     */
81    public static void initTableMapperJob(String table, Scan scan,
82        Class<? extends TableMapper> mapper,
83        Class<?> outputKeyClass,
84        Class<?> outputValueClass, Job job)
85    throws IOException {
86      initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
87          job, true);
88    }
89  
90  
91    /**
92     * Use this before submitting a TableMap job. It will appropriately set up
93     * the job.
94     *
95     * @param table  The table name to read from.
96     * @param scan  The scan instance with the columns, time range etc.
97     * @param mapper  The mapper class to use.
98     * @param outputKeyClass  The class of the output key.
99     * @param outputValueClass  The class of the output value.
100    * @param job  The current job to adjust.  Make sure the passed job is
101    * carrying all necessary HBase configuration.
102    * @throws IOException When setting up the details fails.
103    */
104   public static void initTableMapperJob(TableName table,
105       Scan scan,
106       Class<? extends TableMapper> mapper,
107       Class<?> outputKeyClass,
108       Class<?> outputValueClass,
109       Job job) throws IOException {
110     initTableMapperJob(table.getNameAsString(),
111         scan,
112         mapper,
113         outputKeyClass,
114         outputValueClass,
115         job,
116         true);
117   }
118 
119   /**
120    * Use this before submitting a TableMap job. It will appropriately set up
121    * the job.
122    *
123    * @param table Binary representation of the table name to read from.
124    * @param scan  The scan instance with the columns, time range etc.
125    * @param mapper  The mapper class to use.
126    * @param outputKeyClass  The class of the output key.
127    * @param outputValueClass  The class of the output value.
128    * @param job  The current job to adjust.  Make sure the passed job is
129    * carrying all necessary HBase configuration.
130    * @throws IOException When setting up the details fails.
131    */
132    public static void initTableMapperJob(byte[] table, Scan scan,
133       Class<? extends TableMapper> mapper,
134       Class<?> outputKeyClass,
135       Class<?> outputValueClass, Job job)
136   throws IOException {
137       initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
138               job, true);
139   }
140 
141    /**
142     * Use this before submitting a TableMap job. It will appropriately set up
143     * the job.
144     *
145     * @param table  The table name to read from.
146     * @param scan  The scan instance with the columns, time range etc.
147     * @param mapper  The mapper class to use.
148     * @param outputKeyClass  The class of the output key.
149     * @param outputValueClass  The class of the output value.
150     * @param job  The current job to adjust.  Make sure the passed job is
151     * carrying all necessary HBase configuration.
152     * @param addDependencyJars upload HBase jars and jars for any of the configured
153     *           job classes via the distributed cache (tmpjars).
154     * @throws IOException When setting up the details fails.
155     */
156    public static void initTableMapperJob(String table, Scan scan,
157        Class<? extends TableMapper> mapper,
158        Class<?> outputKeyClass,
159        Class<?> outputValueClass, Job job,
160        boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
161    throws IOException {
162      initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
163          addDependencyJars, true, inputFormatClass);
164    }
165 
166 
167   /**
168    * Use this before submitting a TableMap job. It will appropriately set up
169    * the job.
170    *
171    * @param table  The table name to read from.
172    * @param scan  The scan instance with the columns, time range etc.
173    * @param mapper  The mapper class to use.
174    * @param outputKeyClass  The class of the output key.
175    * @param outputValueClass  The class of the output value.
176    * @param job  The current job to adjust.  Make sure the passed job is
177    * carrying all necessary HBase configuration.
178    * @param addDependencyJars upload HBase jars and jars for any of the configured
179    *           job classes via the distributed cache (tmpjars).
180    * @param initCredentials whether to initialize hbase auth credentials for the job
181    * @param inputFormatClass the input format
182    * @throws IOException When setting up the details fails.
183    */
184   public static void initTableMapperJob(String table, Scan scan,
185       Class<? extends TableMapper> mapper,
186       Class<?> outputKeyClass,
187       Class<?> outputValueClass, Job job,
188       boolean addDependencyJars, boolean initCredentials,
189       Class<? extends InputFormat> inputFormatClass)
190   throws IOException {
191     job.setInputFormatClass(inputFormatClass);
192     if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
193     if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
194     job.setMapperClass(mapper);
195     if (Put.class.equals(outputValueClass)) {
196       job.setCombinerClass(PutCombiner.class);
197     }
198     Configuration conf = job.getConfiguration();
199     HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
200     conf.set(TableInputFormat.INPUT_TABLE, table);
201     conf.set(TableInputFormat.SCAN, convertScanToString(scan));
202     conf.setStrings("io.serializations", conf.get("io.serializations"),
203         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
204         KeyValueSerialization.class.getName());
205     if (addDependencyJars) {
206       addDependencyJars(job);
207     }
208     if (initCredentials) {
209       initCredentials(job);
210     }
211   }
212 
213   /**
214    * Use this before submitting a TableMap job. It will appropriately set up
215    * the job.
216    *
217    * @param table Binary representation of the table name to read from.
218    * @param scan  The scan instance with the columns, time range etc.
219    * @param mapper  The mapper class to use.
220    * @param outputKeyClass  The class of the output key.
221    * @param outputValueClass  The class of the output value.
222    * @param job  The current job to adjust.  Make sure the passed job is
223    * carrying all necessary HBase configuration.
224    * @param addDependencyJars upload HBase jars and jars for any of the configured
225    *           job classes via the distributed cache (tmpjars).
226    * @param inputFormatClass The class of the input format
227    * @throws IOException When setting up the details fails.
228    */
229   public static void initTableMapperJob(byte[] table, Scan scan,
230       Class<? extends TableMapper> mapper,
231       Class<?> outputKeyClass,
232       Class<?> outputValueClass, Job job,
233       boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
234   throws IOException {
235       initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
236               outputValueClass, job, addDependencyJars, inputFormatClass);
237   }
238 
239   /**
240    * Use this before submitting a TableMap job. It will appropriately set up
241    * the job.
242    *
243    * @param table Binary representation of the table name to read from.
244    * @param scan  The scan instance with the columns, time range etc.
245    * @param mapper  The mapper class to use.
246    * @param outputKeyClass  The class of the output key.
247    * @param outputValueClass  The class of the output value.
248    * @param job  The current job to adjust.  Make sure the passed job is
249    * carrying all necessary HBase configuration.
250    * @param addDependencyJars upload HBase jars and jars for any of the configured
251    *           job classes via the distributed cache (tmpjars).
252    * @throws IOException When setting up the details fails.
253    */
254   public static void initTableMapperJob(byte[] table, Scan scan,
255       Class<? extends TableMapper> mapper,
256       Class<?> outputKeyClass,
257       Class<?> outputValueClass, Job job,
258       boolean addDependencyJars)
259   throws IOException {
260       initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
261               outputValueClass, job, addDependencyJars, TableInputFormat.class);
262   }
263 
264   /**
265    * Use this before submitting a TableMap job. It will appropriately set up
266    * the job.
267    *
268    * @param table The table name to read from.
269    * @param scan  The scan instance with the columns, time range etc.
270    * @param mapper  The mapper class to use.
271    * @param outputKeyClass  The class of the output key.
272    * @param outputValueClass  The class of the output value.
273    * @param job  The current job to adjust.  Make sure the passed job is
274    * carrying all necessary HBase configuration.
275    * @param addDependencyJars upload HBase jars and jars for any of the configured
276    *           job classes via the distributed cache (tmpjars).
277    * @throws IOException When setting up the details fails.
278    */
279   public static void initTableMapperJob(String table, Scan scan,
280       Class<? extends TableMapper> mapper,
281       Class<?> outputKeyClass,
282       Class<?> outputValueClass, Job job,
283       boolean addDependencyJars)
284   throws IOException {
285       initTableMapperJob(table, scan, mapper, outputKeyClass,
286               outputValueClass, job, addDependencyJars, TableInputFormat.class);
287   }
288 
289   /**
290    * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
291    * direct memory will likely cause the map tasks to OOM when opening the region. This
292    * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
293    * wants to override this behavior in their job.
294    */
295   public static void resetCacheConfig(Configuration conf) {
296     conf.setFloat(
297       HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
298     conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
299     conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
300   }
301 
302   /**
303    * Sets up the job for reading from one or more table snapshots, with one or more scans
304    * per snapshot.
305    * It bypasses hbase servers and read directly from snapshot files.
306    *
307    * @param snapshotScans     map of snapshot name to scans on that snapshot.
308    * @param mapper            The mapper class to use.
309    * @param outputKeyClass    The class of the output key.
310    * @param outputValueClass  The class of the output value.
311    * @param job               The current job to adjust.  Make sure the passed job is
312    *                          carrying all necessary HBase configuration.
313    * @param addDependencyJars upload HBase jars and jars for any of the configured
314    *                          job classes via the distributed cache (tmpjars).
315    */
316   public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
317       Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
318       Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
319     MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
320 
321     job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
322     if (outputValueClass != null) {
323       job.setMapOutputValueClass(outputValueClass);
324     }
325     if (outputKeyClass != null) {
326       job.setMapOutputKeyClass(outputKeyClass);
327     }
328     job.setMapperClass(mapper);
329     Configuration conf = job.getConfiguration();
330     HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
331 
332     if (addDependencyJars) {
333       addDependencyJars(job);
334     }
335 
336     resetCacheConfig(job.getConfiguration());
337   }
338 
339   /**
340    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
341    * and read directly from snapshot files.
342    *
343    * @param snapshotName The name of the snapshot (of a table) to read from.
344    * @param scan  The scan instance with the columns, time range etc.
345    * @param mapper  The mapper class to use.
346    * @param outputKeyClass  The class of the output key.
347    * @param outputValueClass  The class of the output value.
348    * @param job  The current job to adjust.  Make sure the passed job is
349    * carrying all necessary HBase configuration.
350    * @param addDependencyJars upload HBase jars and jars for any of the configured
351    *           job classes via the distributed cache (tmpjars).
352    *
353    * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
354    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
355    * After the job is finished, restore directory can be deleted.
356    * @throws IOException When setting up the details fails.
357    * @see TableSnapshotInputFormat
358    */
359   public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
360       Class<? extends TableMapper> mapper,
361       Class<?> outputKeyClass,
362       Class<?> outputValueClass, Job job,
363       boolean addDependencyJars, Path tmpRestoreDir)
364   throws IOException {
365     TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
366     initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
367         outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
368     resetCacheConfig(job.getConfiguration());
369   }
370 
371   /**
372    * Use this before submitting a Multi TableMap job. It will appropriately set
373    * up the job.
374    *
375    * @param scans The list of {@link Scan} objects to read from.
376    * @param mapper The mapper class to use.
377    * @param outputKeyClass The class of the output key.
378    * @param outputValueClass The class of the output value.
379    * @param job The current job to adjust. Make sure the passed job is carrying
380    *          all necessary HBase configuration.
381    * @throws IOException When setting up the details fails.
382    */
383   public static void initTableMapperJob(List<Scan> scans,
384       Class<? extends TableMapper> mapper,
385       Class<?> outputKeyClass,
386       Class<?> outputValueClass, Job job) throws IOException {
387     initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
388         true);
389   }
390 
391   /**
392    * Use this before submitting a Multi TableMap job. It will appropriately set
393    * up the job.
394    *
395    * @param scans The list of {@link Scan} objects to read from.
396    * @param mapper The mapper class to use.
397    * @param outputKeyClass The class of the output key.
398    * @param outputValueClass The class of the output value.
399    * @param job The current job to adjust. Make sure the passed job is carrying
400    *          all necessary HBase configuration.
401    * @param addDependencyJars upload HBase jars and jars for any of the
402    *          configured job classes via the distributed cache (tmpjars).
403    * @throws IOException When setting up the details fails.
404    */
405   public static void initTableMapperJob(List<Scan> scans,
406       Class<? extends TableMapper> mapper,
407       Class<?> outputKeyClass,
408       Class<?> outputValueClass, Job job,
409       boolean addDependencyJars) throws IOException {
410     initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
411       addDependencyJars, true);
412   }
413 
414   /**
415    * Use this before submitting a Multi TableMap job. It will appropriately set
416    * up the job.
417    *
418    * @param scans The list of {@link Scan} objects to read from.
419    * @param mapper The mapper class to use.
420    * @param outputKeyClass The class of the output key.
421    * @param outputValueClass The class of the output value.
422    * @param job The current job to adjust. Make sure the passed job is carrying
423    *          all necessary HBase configuration.
424    * @param addDependencyJars upload HBase jars and jars for any of the
425    *          configured job classes via the distributed cache (tmpjars).
426    * @param initCredentials whether to initialize hbase auth credentials for the job
427    * @throws IOException When setting up the details fails.
428    */
429   public static void initTableMapperJob(List<Scan> scans,
430       Class<? extends TableMapper> mapper,
431       Class<?> outputKeyClass,
432       Class<?> outputValueClass, Job job,
433       boolean addDependencyJars,
434       boolean initCredentials) throws IOException {
435     job.setInputFormatClass(MultiTableInputFormat.class);
436     if (outputValueClass != null) {
437       job.setMapOutputValueClass(outputValueClass);
438     }
439     if (outputKeyClass != null) {
440       job.setMapOutputKeyClass(outputKeyClass);
441     }
442     job.setMapperClass(mapper);
443     Configuration conf = job.getConfiguration();
444     HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
445     List<String> scanStrings = new ArrayList<String>();
446 
447     for (Scan scan : scans) {
448       scanStrings.add(convertScanToString(scan));
449     }
450     job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
451       scanStrings.toArray(new String[scanStrings.size()]));
452 
453     if (addDependencyJars) {
454       addDependencyJars(job);
455     }
456 
457     if (initCredentials) {
458       initCredentials(job);
459     }
460   }
461 
462   public static void initCredentials(Job job) throws IOException {
463     UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
464     if (userProvider.isHadoopSecurityEnabled()) {
465       // propagate delegation related props from launcher job to MR job
466       if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
467         job.getConfiguration().set("mapreduce.job.credentials.binary",
468                                    System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
469       }
470     }
471 
472     if (userProvider.isHBaseSecurityEnabled()) {
473       try {
474         // init credentials for remote cluster
475         String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
476         User user = userProvider.getCurrent();
477         if (quorumAddress != null) {
478           Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
479               quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
480           Connection peerConn = ConnectionFactory.createConnection(peerConf);
481           try {
482             TokenUtil.addTokenForJob(peerConn, user, job);
483           } finally {
484             peerConn.close();
485           }
486         }
487 
488         Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
489         try {
490           TokenUtil.addTokenForJob(conn, user, job);
491         } finally {
492           conn.close();
493         }
494       } catch (InterruptedException ie) {
495         LOG.info("Interrupted obtaining user authentication token");
496         Thread.currentThread().interrupt();
497       }
498     }
499   }
500 
501   /**
502    * Obtain an authentication token, for the specified cluster, on behalf of the current user
503    * and add it to the credentials for the given map reduce job.
504    *
505    * The quorumAddress is the key to the ZK ensemble, which contains:
506    * hbase.zookeeper.quorum, hbase.zookeeper.client.port and
507    * zookeeper.znode.parent
508    *
509    * @param job The job that requires the permission.
510    * @param quorumAddress string that contains the 3 required configuratins
511    * @throws IOException When the authentication token cannot be obtained.
512    * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
513    */
514   @Deprecated
515   public static void initCredentialsForCluster(Job job, String quorumAddress)
516       throws IOException {
517     Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
518         quorumAddress);
519     initCredentialsForCluster(job, peerConf);
520   }
521 
522   /**
523    * Obtain an authentication token, for the specified cluster, on behalf of the current user
524    * and add it to the credentials for the given map reduce job.
525    *
526    * @param job The job that requires the permission.
527    * @param conf The configuration to use in connecting to the peer cluster
528    * @throws IOException When the authentication token cannot be obtained.
529    */
530   public static void initCredentialsForCluster(Job job, Configuration conf)
531       throws IOException {
532     UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
533     if (userProvider.isHBaseSecurityEnabled()) {
534       try {
535         Connection peerConn = ConnectionFactory.createConnection(conf);
536         try {
537           TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
538         } finally {
539           peerConn.close();
540         }
541       } catch (InterruptedException e) {
542         LOG.info("Interrupted obtaining user authentication token");
543         Thread.interrupted();
544       }
545     }
546   }
547 
548   /**
549    * Writes the given scan into a Base64 encoded string.
550    *
551    * @param scan  The scan to write out.
552    * @return The scan saved in a Base64 encoded string.
553    * @throws IOException When writing the scan fails.
554    */
555   static String convertScanToString(Scan scan) throws IOException {
556     ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
557     return Base64.encodeBytes(proto.toByteArray());
558   }
559 
560   /**
561    * Converts the given Base64 string back into a Scan instance.
562    *
563    * @param base64  The scan details.
564    * @return The newly created Scan instance.
565    * @throws IOException When reading the scan instance fails.
566    */
567   static Scan convertStringToScan(String base64) throws IOException {
568     byte [] decoded = Base64.decode(base64);
569     ClientProtos.Scan scan;
570     try {
571       scan = ClientProtos.Scan.parseFrom(decoded);
572     } catch (InvalidProtocolBufferException ipbe) {
573       throw new IOException(ipbe);
574     }
575 
576     return ProtobufUtil.toScan(scan);
577   }
578 
579   /**
580    * Use this before submitting a TableReduce job. It will
581    * appropriately set up the JobConf.
582    *
583    * @param table  The output table.
584    * @param reducer  The reducer class to use.
585    * @param job  The current job to adjust.
586    * @throws IOException When determining the region count fails.
587    */
588   public static void initTableReducerJob(String table,
589     Class<? extends TableReducer> reducer, Job job)
590   throws IOException {
591     initTableReducerJob(table, reducer, job, null);
592   }
593 
594   /**
595    * Use this before submitting a TableReduce job. It will
596    * appropriately set up the JobConf.
597    *
598    * @param table  The output table.
599    * @param reducer  The reducer class to use.
600    * @param job  The current job to adjust.
601    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
602    * default partitioner.
603    * @throws IOException When determining the region count fails.
604    */
605   public static void initTableReducerJob(String table,
606     Class<? extends TableReducer> reducer, Job job,
607     Class partitioner) throws IOException {
608     initTableReducerJob(table, reducer, job, partitioner, null, null, null);
609   }
610 
611   /**
612    * Use this before submitting a TableReduce job. It will
613    * appropriately set up the JobConf.
614    *
615    * @param table  The output table.
616    * @param reducer  The reducer class to use.
617    * @param job  The current job to adjust.  Make sure the passed job is
618    * carrying all necessary HBase configuration.
619    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
620    * default partitioner.
621    * @param quorumAddress Distant cluster to write to; default is null for
622    * output to the cluster that is designated in <code>hbase-site.xml</code>.
623    * Set this String to the zookeeper ensemble of an alternate remote cluster
624    * when you would have the reduce write a cluster that is other than the
625    * default; e.g. copying tables between clusters, the source would be
626    * designated by <code>hbase-site.xml</code> and this param would have the
627    * ensemble address of the remote cluster.  The format to pass is particular.
628    * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
629    *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
630    * </code> such as <code>server,server2,server3:2181:/hbase</code>.
631    * @param serverClass redefined hbase.regionserver.class
632    * @param serverImpl redefined hbase.regionserver.impl
633    * @throws IOException When determining the region count fails.
634    */
635   public static void initTableReducerJob(String table,
636     Class<? extends TableReducer> reducer, Job job,
637     Class partitioner, String quorumAddress, String serverClass,
638     String serverImpl) throws IOException {
639     initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
640         serverClass, serverImpl, true);
641   }
642 
643   /**
644    * Use this before submitting a TableReduce job. It will
645    * appropriately set up the JobConf.
646    *
647    * @param table  The output table.
648    * @param reducer  The reducer class to use.
649    * @param job  The current job to adjust.  Make sure the passed job is
650    * carrying all necessary HBase configuration.
651    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
652    * default partitioner.
653    * @param quorumAddress Distant cluster to write to; default is null for
654    * output to the cluster that is designated in <code>hbase-site.xml</code>.
655    * Set this String to the zookeeper ensemble of an alternate remote cluster
656    * when you would have the reduce write a cluster that is other than the
657    * default; e.g. copying tables between clusters, the source would be
658    * designated by <code>hbase-site.xml</code> and this param would have the
659    * ensemble address of the remote cluster.  The format to pass is particular.
660    * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
661    *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
662    * </code> such as <code>server,server2,server3:2181:/hbase</code>.
663    * @param serverClass redefined hbase.regionserver.class
664    * @param serverImpl redefined hbase.regionserver.impl
665    * @param addDependencyJars upload HBase jars and jars for any of the configured
666    *           job classes via the distributed cache (tmpjars).
667    * @throws IOException When determining the region count fails.
668    */
669   public static void initTableReducerJob(String table,
670     Class<? extends TableReducer> reducer, Job job,
671     Class partitioner, String quorumAddress, String serverClass,
672     String serverImpl, boolean addDependencyJars) throws IOException {
673 
674     Configuration conf = job.getConfiguration();
675     HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
676     job.setOutputFormatClass(TableOutputFormat.class);
677     if (reducer != null) job.setReducerClass(reducer);
678     conf.set(TableOutputFormat.OUTPUT_TABLE, table);
679     conf.setStrings("io.serializations", conf.get("io.serializations"),
680         MutationSerialization.class.getName(), ResultSerialization.class.getName());
681     // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
682     if (quorumAddress != null) {
683       // Calling this will validate the format
684       ZKConfig.validateClusterKey(quorumAddress);
685       conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
686     }
687     if (serverClass != null && serverImpl != null) {
688       conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
689       conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
690     }
691     job.setOutputKeyClass(ImmutableBytesWritable.class);
692     job.setOutputValueClass(Writable.class);
693     if (partitioner == HRegionPartitioner.class) {
694       job.setPartitionerClass(HRegionPartitioner.class);
695       int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
696       if (job.getNumReduceTasks() > regions) {
697         job.setNumReduceTasks(regions);
698       }
699     } else if (partitioner != null) {
700       job.setPartitionerClass(partitioner);
701     }
702 
703     if (addDependencyJars) {
704       addDependencyJars(job);
705     }
706 
707     initCredentials(job);
708   }
709 
710   /**
711    * Ensures that the given number of reduce tasks for the given job
712    * configuration does not exceed the number of regions for the given table.
713    *
714    * @param table  The table to get the region count for.
715    * @param job  The current job to adjust.
716    * @throws IOException When retrieving the table details fails.
717    */
718   public static void limitNumReduceTasks(String table, Job job)
719   throws IOException {
720     int regions =
721       MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table));
722     if (job.getNumReduceTasks() > regions)
723       job.setNumReduceTasks(regions);
724   }
725 
726   /**
727    * Sets the number of reduce tasks for the given job configuration to the
728    * number of regions the given table has.
729    *
730    * @param table  The table to get the region count for.
731    * @param job  The current job to adjust.
732    * @throws IOException When retrieving the table details fails.
733    */
734   public static void setNumReduceTasks(String table, Job job)
735   throws IOException {
736     job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(),
737        TableName.valueOf(table)));
738   }
739 
740   /**
741    * Sets the number of rows to return and cache with each scanner iteration.
742    * Higher caching values will enable faster mapreduce jobs at the expense of
743    * requiring more heap to contain the cached rows.
744    *
745    * @param job The current job to adjust.
746    * @param batchSize The number of rows to return in batch with each scanner
747    * iteration.
748    */
749   public static void setScannerCaching(Job job, int batchSize) {
750     job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
751   }
752 
753   /**
754    * Add HBase and its dependencies (only) to the job configuration.
755    * <p>
756    * This is intended as a low-level API, facilitating code reuse between this
757    * class and its mapred counterpart. It also of use to external tools that
758    * need to build a MapReduce job that interacts with HBase but want
759    * fine-grained control over the jars shipped to the cluster.
760    * </p>
761    * @param conf The Configuration object to extend with dependencies.
762    * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
763    * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
764    */
765   public static void addHBaseDependencyJars(Configuration conf) throws IOException {
766 
767     // PrefixTreeCodec is part of the hbase-prefix-tree module. If not included in MR jobs jar
768     // dependencies, MR jobs that write encoded hfiles will fail.
769     // We used reflection here so to prevent a circular module dependency.
770     // TODO - if we extract the MR into a module, make it depend on hbase-prefix-tree.
771     Class prefixTreeCodecClass = null;
772     try {
773       prefixTreeCodecClass =
774           Class.forName("org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec");
775     } catch (ClassNotFoundException e) {
776       // this will show up in unit tests but should not show in real deployments
777       LOG.warn("The hbase-prefix-tree module jar containing PrefixTreeCodec is not present." +
778           "  Continuing without it.");
779     }
780 
781     addDependencyJars(conf,
782       // explicitly pull a class from each module
783       org.apache.hadoop.hbase.HConstants.class,                      // hbase-common
784       org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
785       org.apache.hadoop.hbase.client.Put.class,                      // hbase-client
786       org.apache.hadoop.hbase.CompatibilityFactory.class,            // hbase-hadoop-compat
787       org.apache.hadoop.hbase.mapreduce.TableMapper.class,           // hbase-server
788       prefixTreeCodecClass, //  hbase-prefix-tree (if null will be skipped)
789       // pull necessary dependencies
790       org.apache.zookeeper.ZooKeeper.class,
791       io.netty.channel.Channel.class,
792       com.google.protobuf.Message.class,
793       com.google.common.collect.Lists.class,
794       org.apache.htrace.Trace.class,
795       com.yammer.metrics.core.MetricsRegistry.class);
796   }
797 
798   /**
799    * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}.
800    * Also exposed to shell scripts via `bin/hbase mapredcp`.
801    */
802   public static String buildDependencyClasspath(Configuration conf) {
803     if (conf == null) {
804       throw new IllegalArgumentException("Must provide a configuration object.");
805     }
806     Set<String> paths = new HashSet<String>(conf.getStringCollection("tmpjars"));
807     if (paths.size() == 0) {
808       throw new IllegalArgumentException("Configuration contains no tmpjars.");
809     }
810     StringBuilder sb = new StringBuilder();
811     for (String s : paths) {
812       // entries can take the form 'file:/path/to/file.jar'.
813       int idx = s.indexOf(":");
814       if (idx != -1) s = s.substring(idx + 1);
815       if (sb.length() > 0) sb.append(File.pathSeparator);
816       sb.append(s);
817     }
818     return sb.toString();
819   }
820 
821   /**
822    * Add the HBase dependency jars as well as jars for any of the configured
823    * job classes to the job configuration, so that JobClient will ship them
824    * to the cluster and add them to the DistributedCache.
825    */
826   public static void addDependencyJars(Job job) throws IOException {
827     addHBaseDependencyJars(job.getConfiguration());
828     try {
829       addDependencyJars(job.getConfiguration(),
830           // when making changes here, consider also mapred.TableMapReduceUtil
831           // pull job classes
832           job.getMapOutputKeyClass(),
833           job.getMapOutputValueClass(),
834           job.getInputFormatClass(),
835           job.getOutputKeyClass(),
836           job.getOutputValueClass(),
837           job.getOutputFormatClass(),
838           job.getPartitionerClass(),
839           job.getCombinerClass());
840     } catch (ClassNotFoundException e) {
841       throw new IOException(e);
842     }
843   }
844 
845   /**
846    * Add the jars containing the given classes to the job's configuration
847    * such that JobClient will ship them to the cluster and add them to
848    * the DistributedCache.
849    */
850   public static void addDependencyJars(Configuration conf,
851       Class<?>... classes) throws IOException {
852 
853     FileSystem localFs = FileSystem.getLocal(conf);
854     Set<String> jars = new HashSet<String>();
855     // Add jars that are already in the tmpjars variable
856     jars.addAll(conf.getStringCollection("tmpjars"));
857 
858     // add jars as we find them to a map of contents jar name so that we can avoid
859     // creating new jars for classes that have already been packaged.
860     Map<String, String> packagedClasses = new HashMap<String, String>();
861 
862     // Add jars containing the specified classes
863     for (Class<?> clazz : classes) {
864       if (clazz == null) continue;
865 
866       Path path = findOrCreateJar(clazz, localFs, packagedClasses);
867       if (path == null) {
868         LOG.warn("Could not find jar for class " + clazz +
869                  " in order to ship it to the cluster.");
870         continue;
871       }
872       if (!localFs.exists(path)) {
873         LOG.warn("Could not validate jar file " + path + " for class "
874                  + clazz);
875         continue;
876       }
877       jars.add(path.toString());
878     }
879     if (jars.isEmpty()) return;
880 
881     conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
882   }
883 
884   /**
885    * Finds the Jar for a class or creates it if it doesn't exist. If the class is in
886    * a directory in the classpath, it creates a Jar on the fly with the
887    * contents of the directory and returns the path to that Jar. If a Jar is
888    * created, it is created in the system temporary directory. Otherwise,
889    * returns an existing jar that contains a class of the same name. Maintains
890    * a mapping from jar contents to the tmp jar created.
891    * @param my_class the class to find.
892    * @param fs the FileSystem with which to qualify the returned path.
893    * @param packagedClasses a map of class name to path.
894    * @return a jar file that contains the class.
895    * @throws IOException
896    */
897   private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
898       Map<String, String> packagedClasses)
899   throws IOException {
900     // attempt to locate an existing jar for the class.
901     String jar = findContainingJar(my_class, packagedClasses);
902     if (null == jar || jar.isEmpty()) {
903       jar = getJar(my_class);
904       updateMap(jar, packagedClasses);
905     }
906 
907     if (null == jar || jar.isEmpty()) {
908       return null;
909     }
910 
911     LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
912     return new Path(jar).makeQualified(fs);
913   }
914 
915   /**
916    * Add entries to <code>packagedClasses</code> corresponding to class files
917    * contained in <code>jar</code>.
918    * @param jar The jar who's content to list.
919    * @param packagedClasses map[class -> jar]
920    */
921   private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
922     if (null == jar || jar.isEmpty()) {
923       return;
924     }
925     ZipFile zip = null;
926     try {
927       zip = new ZipFile(jar);
928       for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
929         ZipEntry entry = iter.nextElement();
930         if (entry.getName().endsWith("class")) {
931           packagedClasses.put(entry.getName(), jar);
932         }
933       }
934     } finally {
935       if (null != zip) zip.close();
936     }
937   }
938 
939   /**
940    * Find a jar that contains a class of the same name, if any. It will return
941    * a jar file, even if that is not the first thing on the class path that
942    * has a class with the same name. Looks first on the classpath and then in
943    * the <code>packagedClasses</code> map.
944    * @param my_class the class to find.
945    * @return a jar file that contains the class, or null.
946    * @throws IOException
947    */
948   private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
949       throws IOException {
950     ClassLoader loader = my_class.getClassLoader();
951 
952     String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
953 
954     if (loader != null) {
955       // first search the classpath
956       for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
957         URL url = itr.nextElement();
958         if ("jar".equals(url.getProtocol())) {
959           String toReturn = url.getPath();
960           if (toReturn.startsWith("file:")) {
961             toReturn = toReturn.substring("file:".length());
962           }
963           // URLDecoder is a misnamed class, since it actually decodes
964           // x-www-form-urlencoded MIME type rather than actual
965           // URL encoding (which the file path has). Therefore it would
966           // decode +s to ' 's which is incorrect (spaces are actually
967           // either unencoded or encoded as "%20"). Replace +s first, so
968           // that they are kept sacred during the decoding process.
969           toReturn = toReturn.replaceAll("\\+", "%2B");
970           toReturn = URLDecoder.decode(toReturn, "UTF-8");
971           return toReturn.replaceAll("!.*$", "");
972         }
973       }
974     }
975 
976     // now look in any jars we've packaged using JarFinder. Returns null when
977     // no jar is found.
978     return packagedClasses.get(class_file);
979   }
980 
981   /**
982    * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job
983    * configuration contexts (HBASE-8140) and also for testing on MRv2.
984    * check if we have HADOOP-9426.
985    * @param my_class the class to find.
986    * @return a jar file that contains the class, or null.
987    */
988   private static String getJar(Class<?> my_class) {
989     String ret = null;
990     try {
991       ret = JarFinder.getJar(my_class);
992     } catch (Exception e) {
993       // toss all other exceptions, related to reflection failure
994       throw new RuntimeException("getJar invocation failed.", e);
995     }
996 
997     return ret;
998   }
999 }