View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import com.google.protobuf.InvalidProtocolBufferException;
22  import com.codahale.metrics.MetricRegistry;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.MetaTableAccessor;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.classification.InterfaceStability;
34  import org.apache.hadoop.hbase.client.Connection;
35  import org.apache.hadoop.hbase.client.ConnectionFactory;
36  import org.apache.hadoop.hbase.client.Put;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
41  import org.apache.hadoop.hbase.security.User;
42  import org.apache.hadoop.hbase.security.UserProvider;
43  import org.apache.hadoop.hbase.security.token.TokenUtil;
44  import org.apache.hadoop.hbase.util.Base64;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.zookeeper.ZKConfig;
47  import org.apache.hadoop.io.Writable;
48  import org.apache.hadoop.mapreduce.InputFormat;
49  import org.apache.hadoop.mapreduce.Job;
50  import org.apache.hadoop.util.StringUtils;
51  
52  import java.io.File;
53  import java.io.IOException;
54  import java.net.URL;
55  import java.net.URLDecoder;
56  import java.util.ArrayList;
57  import java.util.Collection;
58  import java.util.Enumeration;
59  import java.util.HashMap;
60  import java.util.HashSet;
61  import java.util.List;
62  import java.util.Map;
63  import java.util.Set;
64  import java.util.zip.ZipEntry;
65  import java.util.zip.ZipFile;
66  
67  /**
68   * Utility for {@link TableMapper} and {@link TableReducer}
69   */
70  @SuppressWarnings({ "rawtypes", "unchecked" })
71  @InterfaceAudience.Public
72  @InterfaceStability.Stable
73  public class TableMapReduceUtil {
74    private static final Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
75  
76    /**
77     * Use this before submitting a TableMap job. It will appropriately set up
78     * the job.
79     *
80     * @param table  The table name to read from.
81     * @param scan  The scan instance with the columns, time range etc.
82     * @param mapper  The mapper class to use.
83     * @param outputKeyClass  The class of the output key.
84     * @param outputValueClass  The class of the output value.
85     * @param job  The current job to adjust.  Make sure the passed job is
86     * carrying all necessary HBase configuration.
87     * @throws IOException When setting up the details fails.
88     */
89    public static void initTableMapperJob(String table, Scan scan,
90        Class<? extends TableMapper> mapper,
91        Class<?> outputKeyClass,
92        Class<?> outputValueClass, Job job)
93    throws IOException {
94      initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
95          job, true);
96    }
97  
98  
99    /**
100    * Use this before submitting a TableMap job. It will appropriately set up
101    * the job.
102    *
103    * @param table  The table name to read from.
104    * @param scan  The scan instance with the columns, time range etc.
105    * @param mapper  The mapper class to use.
106    * @param outputKeyClass  The class of the output key.
107    * @param outputValueClass  The class of the output value.
108    * @param job  The current job to adjust.  Make sure the passed job is
109    * carrying all necessary HBase configuration.
110    * @throws IOException When setting up the details fails.
111    */
112   public static void initTableMapperJob(TableName table,
113       Scan scan,
114       Class<? extends TableMapper> mapper,
115       Class<?> outputKeyClass,
116       Class<?> outputValueClass,
117       Job job) throws IOException {
118     initTableMapperJob(table.getNameAsString(),
119         scan,
120         mapper,
121         outputKeyClass,
122         outputValueClass,
123         job,
124         true);
125   }
126 
127   /**
128    * Use this before submitting a TableMap job. It will appropriately set up
129    * the job.
130    *
131    * @param table Binary representation of the table name to read from.
132    * @param scan  The scan instance with the columns, time range etc.
133    * @param mapper  The mapper class to use.
134    * @param outputKeyClass  The class of the output key.
135    * @param outputValueClass  The class of the output value.
136    * @param job  The current job to adjust.  Make sure the passed job is
137    * carrying all necessary HBase configuration.
138    * @throws IOException When setting up the details fails.
139    */
140    public static void initTableMapperJob(byte[] table, Scan scan,
141       Class<? extends TableMapper> mapper,
142       Class<?> outputKeyClass,
143       Class<?> outputValueClass, Job job)
144   throws IOException {
145       initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
146               job, true);
147   }
148 
149    /**
150     * Use this before submitting a TableMap job. It will appropriately set up
151     * the job.
152     *
153     * @param table  The table name to read from.
154     * @param scan  The scan instance with the columns, time range etc.
155     * @param mapper  The mapper class to use.
156     * @param outputKeyClass  The class of the output key.
157     * @param outputValueClass  The class of the output value.
158     * @param job  The current job to adjust.  Make sure the passed job is
159     * carrying all necessary HBase configuration.
160     * @param addDependencyJars upload HBase jars and jars for any of the configured
161     *           job classes via the distributed cache (tmpjars).
162     * @throws IOException When setting up the details fails.
163     */
164    public static void initTableMapperJob(String table, Scan scan,
165        Class<? extends TableMapper> mapper,
166        Class<?> outputKeyClass,
167        Class<?> outputValueClass, Job job,
168        boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
169    throws IOException {
170      initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
171          addDependencyJars, true, inputFormatClass);
172    }
173 
174 
175   /**
176    * Use this before submitting a TableMap job. It will appropriately set up
177    * the job.
178    *
179    * @param table  The table name to read from.
180    * @param scan  The scan instance with the columns, time range etc.
181    * @param mapper  The mapper class to use.
182    * @param outputKeyClass  The class of the output key.
183    * @param outputValueClass  The class of the output value.
184    * @param job  The current job to adjust.  Make sure the passed job is
185    * carrying all necessary HBase configuration.
186    * @param addDependencyJars upload HBase jars and jars for any of the configured
187    *           job classes via the distributed cache (tmpjars).
188    * @param initCredentials whether to initialize hbase auth credentials for the job
189    * @param inputFormatClass the input format
190    * @throws IOException When setting up the details fails.
191    */
192   public static void initTableMapperJob(String table, Scan scan,
193       Class<? extends TableMapper> mapper,
194       Class<?> outputKeyClass,
195       Class<?> outputValueClass, Job job,
196       boolean addDependencyJars, boolean initCredentials,
197       Class<? extends InputFormat> inputFormatClass)
198   throws IOException {
199     job.setInputFormatClass(inputFormatClass);
200     if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
201     if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
202     job.setMapperClass(mapper);
203     if (Put.class.equals(outputValueClass)) {
204       job.setCombinerClass(PutCombiner.class);
205     }
206     Configuration conf = job.getConfiguration();
207     HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
208     conf.set(TableInputFormat.INPUT_TABLE, table);
209     conf.set(TableInputFormat.SCAN, convertScanToString(scan));
210     conf.setStrings("io.serializations", conf.get("io.serializations"),
211         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
212         KeyValueSerialization.class.getName());
213     if (addDependencyJars) {
214       addDependencyJars(job);
215     }
216     if (initCredentials) {
217       initCredentials(job);
218     }
219   }
220 
221   /**
222    * Use this before submitting a TableMap job. It will appropriately set up
223    * the job.
224    *
225    * @param table Binary representation of the table name to read from.
226    * @param scan  The scan instance with the columns, time range etc.
227    * @param mapper  The mapper class to use.
228    * @param outputKeyClass  The class of the output key.
229    * @param outputValueClass  The class of the output value.
230    * @param job  The current job to adjust.  Make sure the passed job is
231    * carrying all necessary HBase configuration.
232    * @param addDependencyJars upload HBase jars and jars for any of the configured
233    *           job classes via the distributed cache (tmpjars).
234    * @param inputFormatClass The class of the input format
235    * @throws IOException When setting up the details fails.
236    */
237   public static void initTableMapperJob(byte[] table, Scan scan,
238       Class<? extends TableMapper> mapper,
239       Class<?> outputKeyClass,
240       Class<?> outputValueClass, Job job,
241       boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
242   throws IOException {
243       initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
244               outputValueClass, job, addDependencyJars, inputFormatClass);
245   }
246 
247   /**
248    * Use this before submitting a TableMap job. It will appropriately set up
249    * the job.
250    *
251    * @param table Binary representation of the table name to read from.
252    * @param scan  The scan instance with the columns, time range etc.
253    * @param mapper  The mapper class to use.
254    * @param outputKeyClass  The class of the output key.
255    * @param outputValueClass  The class of the output value.
256    * @param job  The current job to adjust.  Make sure the passed job is
257    * carrying all necessary HBase configuration.
258    * @param addDependencyJars upload HBase jars and jars for any of the configured
259    *           job classes via the distributed cache (tmpjars).
260    * @throws IOException When setting up the details fails.
261    */
262   public static void initTableMapperJob(byte[] table, Scan scan,
263       Class<? extends TableMapper> mapper,
264       Class<?> outputKeyClass,
265       Class<?> outputValueClass, Job job,
266       boolean addDependencyJars)
267   throws IOException {
268       initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
269               outputValueClass, job, addDependencyJars, TableInputFormat.class);
270   }
271 
272   /**
273    * Use this before submitting a TableMap job. It will appropriately set up
274    * the job.
275    *
276    * @param table The table name to read from.
277    * @param scan  The scan instance with the columns, time range etc.
278    * @param mapper  The mapper class to use.
279    * @param outputKeyClass  The class of the output key.
280    * @param outputValueClass  The class of the output value.
281    * @param job  The current job to adjust.  Make sure the passed job is
282    * carrying all necessary HBase configuration.
283    * @param addDependencyJars upload HBase jars and jars for any of the configured
284    *           job classes via the distributed cache (tmpjars).
285    * @throws IOException When setting up the details fails.
286    */
287   public static void initTableMapperJob(String table, Scan scan,
288       Class<? extends TableMapper> mapper,
289       Class<?> outputKeyClass,
290       Class<?> outputValueClass, Job job,
291       boolean addDependencyJars)
292   throws IOException {
293       initTableMapperJob(table, scan, mapper, outputKeyClass,
294               outputValueClass, job, addDependencyJars, TableInputFormat.class);
295   }
296 
297   /**
298    * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
299    * direct memory will likely cause the map tasks to OOM when opening the region. This
300    * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
301    * wants to override this behavior in their job.
302    */
303   public static void resetCacheConfig(Configuration conf) {
304     conf.setFloat(
305       HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
306     conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
307     conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
308   }
309 
310   /**
311    * Sets up the job for reading from one or more table snapshots, with one or more scans
312    * per snapshot.
313    * It bypasses hbase servers and read directly from snapshot files.
314    *
315    * @param snapshotScans     map of snapshot name to scans on that snapshot.
316    * @param mapper            The mapper class to use.
317    * @param outputKeyClass    The class of the output key.
318    * @param outputValueClass  The class of the output value.
319    * @param job               The current job to adjust.  Make sure the passed job is
320    *                          carrying all necessary HBase configuration.
321    * @param addDependencyJars upload HBase jars and jars for any of the configured
322    *                          job classes via the distributed cache (tmpjars).
323    */
324   public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
325       Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
326       Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
327     MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
328 
329     job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
330     if (outputValueClass != null) {
331       job.setMapOutputValueClass(outputValueClass);
332     }
333     if (outputKeyClass != null) {
334       job.setMapOutputKeyClass(outputKeyClass);
335     }
336     job.setMapperClass(mapper);
337     Configuration conf = job.getConfiguration();
338     HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
339 
340     if (addDependencyJars) {
341       addDependencyJars(job);
342       addDependencyJars(job.getConfiguration(), MetricRegistry.class);
343     }
344 
345     resetCacheConfig(job.getConfiguration());
346   }
347 
348   /**
349    * Sets up the job for reading from a table snapshot. It bypasses hbase servers
350    * and read directly from snapshot files.
351    *
352    * @param snapshotName The name of the snapshot (of a table) to read from.
353    * @param scan  The scan instance with the columns, time range etc.
354    * @param mapper  The mapper class to use.
355    * @param outputKeyClass  The class of the output key.
356    * @param outputValueClass  The class of the output value.
357    * @param job  The current job to adjust.  Make sure the passed job is
358    * carrying all necessary HBase configuration.
359    * @param addDependencyJars upload HBase jars and jars for any of the configured
360    *           job classes via the distributed cache (tmpjars).
361    *
362    * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
363    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
364    * After the job is finished, restore directory can be deleted.
365    * @throws IOException When setting up the details fails.
366    * @see TableSnapshotInputFormat
367    */
368   public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
369       Class<? extends TableMapper> mapper,
370       Class<?> outputKeyClass,
371       Class<?> outputValueClass, Job job,
372       boolean addDependencyJars, Path tmpRestoreDir)
373   throws IOException {
374     TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
375     initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
376         outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
377     resetCacheConfig(job.getConfiguration());
378   }
379 
380   /**
381    * Use this before submitting a Multi TableMap job. It will appropriately set
382    * up the job.
383    *
384    * @param scans The list of {@link Scan} objects to read from.
385    * @param mapper The mapper class to use.
386    * @param outputKeyClass The class of the output key.
387    * @param outputValueClass The class of the output value.
388    * @param job The current job to adjust. Make sure the passed job is carrying
389    *          all necessary HBase configuration.
390    * @throws IOException When setting up the details fails.
391    */
392   public static void initTableMapperJob(List<Scan> scans,
393       Class<? extends TableMapper> mapper,
394       Class<?> outputKeyClass,
395       Class<?> outputValueClass, Job job) throws IOException {
396     initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
397         true);
398   }
399 
400   /**
401    * Use this before submitting a Multi TableMap job. It will appropriately set
402    * up the job.
403    *
404    * @param scans The list of {@link Scan} objects to read from.
405    * @param mapper The mapper class to use.
406    * @param outputKeyClass The class of the output key.
407    * @param outputValueClass The class of the output value.
408    * @param job The current job to adjust. Make sure the passed job is carrying
409    *          all necessary HBase configuration.
410    * @param addDependencyJars upload HBase jars and jars for any of the
411    *          configured job classes via the distributed cache (tmpjars).
412    * @throws IOException When setting up the details fails.
413    */
414   public static void initTableMapperJob(List<Scan> scans,
415       Class<? extends TableMapper> mapper,
416       Class<?> outputKeyClass,
417       Class<?> outputValueClass, Job job,
418       boolean addDependencyJars) throws IOException {
419     initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
420       addDependencyJars, true);
421   }
422 
423   /**
424    * Use this before submitting a Multi TableMap job. It will appropriately set
425    * up the job.
426    *
427    * @param scans The list of {@link Scan} objects to read from.
428    * @param mapper The mapper class to use.
429    * @param outputKeyClass The class of the output key.
430    * @param outputValueClass The class of the output value.
431    * @param job The current job to adjust. Make sure the passed job is carrying
432    *          all necessary HBase configuration.
433    * @param addDependencyJars upload HBase jars and jars for any of the
434    *          configured job classes via the distributed cache (tmpjars).
435    * @param initCredentials whether to initialize hbase auth credentials for the job
436    * @throws IOException When setting up the details fails.
437    */
438   public static void initTableMapperJob(List<Scan> scans,
439       Class<? extends TableMapper> mapper,
440       Class<?> outputKeyClass,
441       Class<?> outputValueClass, Job job,
442       boolean addDependencyJars,
443       boolean initCredentials) throws IOException {
444     job.setInputFormatClass(MultiTableInputFormat.class);
445     if (outputValueClass != null) {
446       job.setMapOutputValueClass(outputValueClass);
447     }
448     if (outputKeyClass != null) {
449       job.setMapOutputKeyClass(outputKeyClass);
450     }
451     job.setMapperClass(mapper);
452     Configuration conf = job.getConfiguration();
453     HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
454     List<String> scanStrings = new ArrayList<String>();
455 
456     for (Scan scan : scans) {
457       scanStrings.add(convertScanToString(scan));
458     }
459     job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
460       scanStrings.toArray(new String[scanStrings.size()]));
461 
462     if (addDependencyJars) {
463       addDependencyJars(job);
464     }
465 
466     if (initCredentials) {
467       initCredentials(job);
468     }
469   }
470 
471   public static void initCredentials(Job job) throws IOException {
472     UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
473     if (userProvider.isHadoopSecurityEnabled()) {
474       // propagate delegation related props from launcher job to MR job
475       if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
476         job.getConfiguration().set("mapreduce.job.credentials.binary",
477                                    System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
478       }
479     }
480 
481     if (userProvider.isHBaseSecurityEnabled()) {
482       try {
483         // init credentials for remote cluster
484         String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
485         User user = userProvider.getCurrent();
486         if (quorumAddress != null) {
487           Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
488               quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
489           Connection peerConn = ConnectionFactory.createConnection(peerConf);
490           try {
491             TokenUtil.addTokenForJob(peerConn, user, job);
492           } finally {
493             peerConn.close();
494           }
495         }
496 
497         Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
498         try {
499           TokenUtil.addTokenForJob(conn, user, job);
500         } finally {
501           conn.close();
502         }
503       } catch (InterruptedException ie) {
504         LOG.info("Interrupted obtaining user authentication token");
505         Thread.currentThread().interrupt();
506       }
507     }
508   }
509 
510   /**
511    * Obtain an authentication token, for the specified cluster, on behalf of the current user
512    * and add it to the credentials for the given map reduce job.
513    *
514    * The quorumAddress is the key to the ZK ensemble, which contains:
515    * hbase.zookeeper.quorum, hbase.zookeeper.client.port and
516    * zookeeper.znode.parent
517    *
518    * @param job The job that requires the permission.
519    * @param quorumAddress string that contains the 3 required configuratins
520    * @throws IOException When the authentication token cannot be obtained.
521    * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
522    */
523   @Deprecated
524   public static void initCredentialsForCluster(Job job, String quorumAddress)
525       throws IOException {
526     Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
527         quorumAddress);
528     initCredentialsForCluster(job, peerConf);
529   }
530 
531   /**
532    * Obtain an authentication token, for the specified cluster, on behalf of the current user
533    * and add it to the credentials for the given map reduce job.
534    *
535    * @param job The job that requires the permission.
536    * @param conf The configuration to use in connecting to the peer cluster
537    * @throws IOException When the authentication token cannot be obtained.
538    */
539   public static void initCredentialsForCluster(Job job, Configuration conf)
540       throws IOException {
541     UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
542     if (userProvider.isHBaseSecurityEnabled()) {
543       try {
544         Connection peerConn = ConnectionFactory.createConnection(conf);
545         try {
546           TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
547         } finally {
548           peerConn.close();
549         }
550       } catch (InterruptedException e) {
551         LOG.info("Interrupted obtaining user authentication token");
552         Thread.interrupted();
553       }
554     }
555   }
556 
557   /**
558    * Writes the given scan into a Base64 encoded string.
559    *
560    * @param scan  The scan to write out.
561    * @return The scan saved in a Base64 encoded string.
562    * @throws IOException When writing the scan fails.
563    */
564   public static String convertScanToString(Scan scan) throws IOException {
565     ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
566     return Base64.encodeBytes(proto.toByteArray());
567   }
568 
569   /**
570    * Converts the given Base64 string back into a Scan instance.
571    *
572    * @param base64  The scan details.
573    * @return The newly created Scan instance.
574    * @throws IOException When reading the scan instance fails.
575    */
576   public static Scan convertStringToScan(String base64) throws IOException {
577     byte [] decoded = Base64.decode(base64);
578     ClientProtos.Scan scan;
579     try {
580       scan = ClientProtos.Scan.parseFrom(decoded);
581     } catch (InvalidProtocolBufferException ipbe) {
582       throw new IOException(ipbe);
583     }
584 
585     return ProtobufUtil.toScan(scan);
586   }
587 
588   /**
589    * Use this before submitting a TableReduce job. It will
590    * appropriately set up the JobConf.
591    *
592    * @param table  The output table.
593    * @param reducer  The reducer class to use.
594    * @param job  The current job to adjust.
595    * @throws IOException When determining the region count fails.
596    */
597   public static void initTableReducerJob(String table,
598     Class<? extends TableReducer> reducer, Job job)
599   throws IOException {
600     initTableReducerJob(table, reducer, job, null);
601   }
602 
603   /**
604    * Use this before submitting a TableReduce job. It will
605    * appropriately set up the JobConf.
606    *
607    * @param table  The output table.
608    * @param reducer  The reducer class to use.
609    * @param job  The current job to adjust.
610    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
611    * default partitioner.
612    * @throws IOException When determining the region count fails.
613    */
614   public static void initTableReducerJob(String table,
615     Class<? extends TableReducer> reducer, Job job,
616     Class partitioner) throws IOException {
617     initTableReducerJob(table, reducer, job, partitioner, null, null, null);
618   }
619 
620   /**
621    * Use this before submitting a TableReduce job. It will
622    * appropriately set up the JobConf.
623    *
624    * @param table  The output table.
625    * @param reducer  The reducer class to use.
626    * @param job  The current job to adjust.  Make sure the passed job is
627    * carrying all necessary HBase configuration.
628    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
629    * default partitioner.
630    * @param quorumAddress Distant cluster to write to; default is null for
631    * output to the cluster that is designated in <code>hbase-site.xml</code>.
632    * Set this String to the zookeeper ensemble of an alternate remote cluster
633    * when you would have the reduce write a cluster that is other than the
634    * default; e.g. copying tables between clusters, the source would be
635    * designated by <code>hbase-site.xml</code> and this param would have the
636    * ensemble address of the remote cluster.  The format to pass is particular.
637    * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
638    *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
639    * </code> such as <code>server,server2,server3:2181:/hbase</code>.
640    * @param serverClass redefined hbase.regionserver.class
641    * @param serverImpl redefined hbase.regionserver.impl
642    * @throws IOException When determining the region count fails.
643    */
644   public static void initTableReducerJob(String table,
645     Class<? extends TableReducer> reducer, Job job,
646     Class partitioner, String quorumAddress, String serverClass,
647     String serverImpl) throws IOException {
648     initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
649         serverClass, serverImpl, true);
650   }
651 
652   /**
653    * Use this before submitting a TableReduce job. It will
654    * appropriately set up the JobConf.
655    *
656    * @param table  The output table.
657    * @param reducer  The reducer class to use.
658    * @param job  The current job to adjust.  Make sure the passed job is
659    * carrying all necessary HBase configuration.
660    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
661    * default partitioner.
662    * @param quorumAddress Distant cluster to write to; default is null for
663    * output to the cluster that is designated in <code>hbase-site.xml</code>.
664    * Set this String to the zookeeper ensemble of an alternate remote cluster
665    * when you would have the reduce write a cluster that is other than the
666    * default; e.g. copying tables between clusters, the source would be
667    * designated by <code>hbase-site.xml</code> and this param would have the
668    * ensemble address of the remote cluster.  The format to pass is particular.
669    * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
670    *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
671    * </code> such as <code>server,server2,server3:2181:/hbase</code>.
672    * @param serverClass redefined hbase.regionserver.class
673    * @param serverImpl redefined hbase.regionserver.impl
674    * @param addDependencyJars upload HBase jars and jars for any of the configured
675    *           job classes via the distributed cache (tmpjars).
676    * @throws IOException When determining the region count fails.
677    */
678   public static void initTableReducerJob(String table,
679     Class<? extends TableReducer> reducer, Job job,
680     Class partitioner, String quorumAddress, String serverClass,
681     String serverImpl, boolean addDependencyJars) throws IOException {
682 
683     Configuration conf = job.getConfiguration();
684     HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
685     job.setOutputFormatClass(TableOutputFormat.class);
686     if (reducer != null) job.setReducerClass(reducer);
687     conf.set(TableOutputFormat.OUTPUT_TABLE, table);
688     conf.setStrings("io.serializations", conf.get("io.serializations"),
689         MutationSerialization.class.getName(), ResultSerialization.class.getName());
690     // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
691     if (quorumAddress != null) {
692       // Calling this will validate the format
693       ZKConfig.validateClusterKey(quorumAddress);
694       conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
695     }
696     if (serverClass != null && serverImpl != null) {
697       conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
698       conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
699     }
700     job.setOutputKeyClass(ImmutableBytesWritable.class);
701     job.setOutputValueClass(Writable.class);
702     if (partitioner == HRegionPartitioner.class) {
703       job.setPartitionerClass(HRegionPartitioner.class);
704       int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
705       if (job.getNumReduceTasks() > regions) {
706         job.setNumReduceTasks(regions);
707       }
708     } else if (partitioner != null) {
709       job.setPartitionerClass(partitioner);
710     }
711 
712     if (addDependencyJars) {
713       addDependencyJars(job);
714     }
715 
716     initCredentials(job);
717   }
718 
719   /**
720    * Ensures that the given number of reduce tasks for the given job
721    * configuration does not exceed the number of regions for the given table.
722    *
723    * @param table  The table to get the region count for.
724    * @param job  The current job to adjust.
725    * @throws IOException When retrieving the table details fails.
726    */
727   public static void limitNumReduceTasks(String table, Job job)
728   throws IOException {
729     int regions =
730       MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table));
731     if (job.getNumReduceTasks() > regions)
732       job.setNumReduceTasks(regions);
733   }
734 
735   /**
736    * Sets the number of reduce tasks for the given job configuration to the
737    * number of regions the given table has.
738    *
739    * @param table  The table to get the region count for.
740    * @param job  The current job to adjust.
741    * @throws IOException When retrieving the table details fails.
742    */
743   public static void setNumReduceTasks(String table, Job job)
744   throws IOException {
745     job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(),
746        TableName.valueOf(table)));
747   }
748 
749   /**
750    * Sets the number of rows to return and cache with each scanner iteration.
751    * Higher caching values will enable faster mapreduce jobs at the expense of
752    * requiring more heap to contain the cached rows.
753    *
754    * @param job The current job to adjust.
755    * @param batchSize The number of rows to return in batch with each scanner
756    * iteration.
757    */
758   public static void setScannerCaching(Job job, int batchSize) {
759     job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
760   }
761 
762   /**
763    * Add HBase and its dependencies (only) to the job configuration.
764    * <p>
765    * This is intended as a low-level API, facilitating code reuse between this
766    * class and its mapred counterpart. It also of use to external tools that
767    * need to build a MapReduce job that interacts with HBase but want
768    * fine-grained control over the jars shipped to the cluster.
769    * </p>
770    * @param conf The Configuration object to extend with dependencies.
771    * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
772    * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
773    */
774   public static void addHBaseDependencyJars(Configuration conf) throws IOException {
775 
776     // PrefixTreeCodec is part of the hbase-prefix-tree module. If not included in MR jobs jar
777     // dependencies, MR jobs that write encoded hfiles will fail.
778     // We used reflection here so to prevent a circular module dependency.
779     // TODO - if we extract the MR into a module, make it depend on hbase-prefix-tree.
780     Class prefixTreeCodecClass = null;
781     try {
782       prefixTreeCodecClass =
783           Class.forName("org.apache.hadoop.hbase.code.prefixtree.PrefixTreeCodec");
784     } catch (ClassNotFoundException e) {
785       // this will show up in unit tests but should not show in real deployments
786       LOG.warn("The hbase-prefix-tree module jar containing PrefixTreeCodec is not present." +
787           "  Continuing without it.");
788     }
789 
790     addDependencyJars(conf,
791       // explicitly pull a class from each module
792       org.apache.hadoop.hbase.HConstants.class,                      // hbase-common
793       org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
794       org.apache.hadoop.hbase.client.Put.class,                      // hbase-client
795       org.apache.hadoop.hbase.CompatibilityFactory.class,            // hbase-hadoop-compat
796       org.apache.hadoop.hbase.mapreduce.TableMapper.class,           // hbase-server
797       prefixTreeCodecClass, //  hbase-prefix-tree (if null will be skipped)
798       // pull necessary dependencies
799       org.apache.zookeeper.ZooKeeper.class,
800       io.netty.channel.Channel.class,
801       com.google.protobuf.Message.class,
802       com.google.common.collect.Lists.class,
803       org.apache.htrace.Trace.class,
804       com.codahale.metrics.MetricRegistry.class);
805   }
806 
807   /**
808    * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}.
809    * Also exposed to shell scripts via `bin/hbase mapredcp`.
810    */
811   public static String buildDependencyClasspath(Configuration conf) {
812     if (conf == null) {
813       throw new IllegalArgumentException("Must provide a configuration object.");
814     }
815     Set<String> paths = new HashSet<String>(conf.getStringCollection("tmpjars"));
816     if (paths.size() == 0) {
817       throw new IllegalArgumentException("Configuration contains no tmpjars.");
818     }
819     StringBuilder sb = new StringBuilder();
820     for (String s : paths) {
821       // entries can take the form 'file:/path/to/file.jar'.
822       int idx = s.indexOf(":");
823       if (idx != -1) s = s.substring(idx + 1);
824       if (sb.length() > 0) sb.append(File.pathSeparator);
825       sb.append(s);
826     }
827     return sb.toString();
828   }
829 
830   /**
831    * Add the HBase dependency jars as well as jars for any of the configured
832    * job classes to the job configuration, so that JobClient will ship them
833    * to the cluster and add them to the DistributedCache.
834    */
835   public static void addDependencyJars(Job job) throws IOException {
836     addHBaseDependencyJars(job.getConfiguration());
837     try {
838       addDependencyJars(job.getConfiguration(),
839           // when making changes here, consider also mapred.TableMapReduceUtil
840           // pull job classes
841           job.getMapOutputKeyClass(),
842           job.getMapOutputValueClass(),
843           job.getInputFormatClass(),
844           job.getOutputKeyClass(),
845           job.getOutputValueClass(),
846           job.getOutputFormatClass(),
847           job.getPartitionerClass(),
848           job.getCombinerClass());
849     } catch (ClassNotFoundException e) {
850       throw new IOException(e);
851     }
852   }
853 
854   /**
855    * Add the jars containing the given classes to the job's configuration
856    * such that JobClient will ship them to the cluster and add them to
857    * the DistributedCache.
858    */
859   public static void addDependencyJars(Configuration conf,
860       Class<?>... classes) throws IOException {
861 
862     FileSystem localFs = FileSystem.getLocal(conf);
863     Set<String> jars = new HashSet<String>();
864     // Add jars that are already in the tmpjars variable
865     jars.addAll(conf.getStringCollection("tmpjars"));
866 
867     // add jars as we find them to a map of contents jar name so that we can avoid
868     // creating new jars for classes that have already been packaged.
869     Map<String, String> packagedClasses = new HashMap<String, String>();
870 
871     // Add jars containing the specified classes
872     for (Class<?> clazz : classes) {
873       if (clazz == null) continue;
874 
875       Path path = findOrCreateJar(clazz, localFs, packagedClasses);
876       if (path == null) {
877         LOG.warn("Could not find jar for class " + clazz +
878                  " in order to ship it to the cluster.");
879         continue;
880       }
881       if (!localFs.exists(path)) {
882         LOG.warn("Could not validate jar file " + path + " for class "
883                  + clazz);
884         continue;
885       }
886       jars.add(path.toString());
887     }
888     if (jars.isEmpty()) return;
889 
890     conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
891   }
892 
893   /**
894    * Finds the Jar for a class or creates it if it doesn't exist. If the class is in
895    * a directory in the classpath, it creates a Jar on the fly with the
896    * contents of the directory and returns the path to that Jar. If a Jar is
897    * created, it is created in the system temporary directory. Otherwise,
898    * returns an existing jar that contains a class of the same name. Maintains
899    * a mapping from jar contents to the tmp jar created.
900    * @param my_class the class to find.
901    * @param fs the FileSystem with which to qualify the returned path.
902    * @param packagedClasses a map of class name to path.
903    * @return a jar file that contains the class.
904    * @throws IOException
905    */
906   private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
907       Map<String, String> packagedClasses)
908   throws IOException {
909     // attempt to locate an existing jar for the class.
910     String jar = findContainingJar(my_class, packagedClasses);
911     if (null == jar || jar.isEmpty()) {
912       jar = getJar(my_class);
913       updateMap(jar, packagedClasses);
914     }
915 
916     if (null == jar || jar.isEmpty()) {
917       return null;
918     }
919 
920     LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
921     return new Path(jar).makeQualified(fs);
922   }
923 
924   /**
925    * Add entries to <code>packagedClasses</code> corresponding to class files
926    * contained in <code>jar</code>.
927    * @param jar The jar who's content to list.
928    * @param packagedClasses map[class -> jar]
929    */
930   private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
931     if (null == jar || jar.isEmpty()) {
932       return;
933     }
934     ZipFile zip = null;
935     try {
936       zip = new ZipFile(jar);
937       for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
938         ZipEntry entry = iter.nextElement();
939         if (entry.getName().endsWith("class")) {
940           packagedClasses.put(entry.getName(), jar);
941         }
942       }
943     } finally {
944       if (null != zip) zip.close();
945     }
946   }
947 
948   /**
949    * Find a jar that contains a class of the same name, if any. It will return
950    * a jar file, even if that is not the first thing on the class path that
951    * has a class with the same name. Looks first on the classpath and then in
952    * the <code>packagedClasses</code> map.
953    * @param my_class the class to find.
954    * @return a jar file that contains the class, or null.
955    * @throws IOException
956    */
957   private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
958       throws IOException {
959     ClassLoader loader = my_class.getClassLoader();
960 
961     String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
962 
963     if (loader != null) {
964       // first search the classpath
965       for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
966         URL url = itr.nextElement();
967         if ("jar".equals(url.getProtocol())) {
968           String toReturn = url.getPath();
969           if (toReturn.startsWith("file:")) {
970             toReturn = toReturn.substring("file:".length());
971           }
972           // URLDecoder is a misnamed class, since it actually decodes
973           // x-www-form-urlencoded MIME type rather than actual
974           // URL encoding (which the file path has). Therefore it would
975           // decode +s to ' 's which is incorrect (spaces are actually
976           // either unencoded or encoded as "%20"). Replace +s first, so
977           // that they are kept sacred during the decoding process.
978           toReturn = toReturn.replaceAll("\\+", "%2B");
979           toReturn = URLDecoder.decode(toReturn, "UTF-8");
980           return toReturn.replaceAll("!.*$", "");
981         }
982       }
983     }
984 
985     // now look in any jars we've packaged using JarFinder. Returns null when
986     // no jar is found.
987     return packagedClasses.get(class_file);
988   }
989 
990   /**
991    * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job
992    * configuration contexts (HBASE-8140) and also for testing on MRv2.
993    * check if we have HADOOP-9426.
994    * @param my_class the class to find.
995    * @return a jar file that contains the class, or null.
996    */
997   private static String getJar(Class<?> my_class) {
998     String ret = null;
999     try {
1000       ret = JarFinder.getJar(my_class);
1001     } catch (Exception e) {
1002       // toss all other exceptions, related to reflection failure
1003       throw new RuntimeException("getJar invocation failed.", e);
1004     }
1005 
1006     return ret;
1007   }
1008 }