001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.File;
022import java.io.IOException;
023import java.net.URL;
024import java.net.URLDecoder;
025import java.util.ArrayList;
026import java.util.Base64;
027import java.util.Collection;
028import java.util.Enumeration;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.zip.ZipEntry;
035import java.util.zip.ZipFile;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.ConnectionFactory;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RegionLocator;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
054import org.apache.hadoop.hbase.security.User;
055import org.apache.hadoop.hbase.security.UserProvider;
056import org.apache.hadoop.hbase.security.token.TokenUtil;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.RegionSplitter;
059import org.apache.hadoop.hbase.zookeeper.ZKConfig;
060import org.apache.hadoop.io.Writable;
061import org.apache.hadoop.mapreduce.InputFormat;
062import org.apache.hadoop.mapreduce.Job;
063import org.apache.hadoop.util.StringUtils;
064
065import com.codahale.metrics.MetricRegistry;
066
067/**
068 * Utility for {@link TableMapper} and {@link TableReducer}
069 */
070@SuppressWarnings({ "rawtypes", "unchecked" })
071@InterfaceAudience.Public
072public class TableMapReduceUtil {
073  private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class);
074
075  /**
076   * Use this before submitting a TableMap job. It will appropriately set up
077   * the job.
078   *
079   * @param table  The table name to read from.
080   * @param scan  The scan instance with the columns, time range etc.
081   * @param mapper  The mapper class to use.
082   * @param outputKeyClass  The class of the output key.
083   * @param outputValueClass  The class of the output value.
084   * @param job  The current job to adjust.  Make sure the passed job is
085   * carrying all necessary HBase configuration.
086   * @throws IOException When setting up the details fails.
087   */
088  public static void initTableMapperJob(String table, Scan scan,
089      Class<? extends TableMapper> mapper,
090      Class<?> outputKeyClass,
091      Class<?> outputValueClass, Job job)
092  throws IOException {
093    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
094        job, true);
095  }
096
097
098  /**
099   * Use this before submitting a TableMap job. It will appropriately set up
100   * the job.
101   *
102   * @param table  The table name to read from.
103   * @param scan  The scan instance with the columns, time range etc.
104   * @param mapper  The mapper class to use.
105   * @param outputKeyClass  The class of the output key.
106   * @param outputValueClass  The class of the output value.
107   * @param job  The current job to adjust.  Make sure the passed job is
108   * carrying all necessary HBase configuration.
109   * @throws IOException When setting up the details fails.
110   */
111  public static void initTableMapperJob(TableName table,
112      Scan scan,
113      Class<? extends TableMapper> mapper,
114      Class<?> outputKeyClass,
115      Class<?> outputValueClass,
116      Job job) throws IOException {
117    initTableMapperJob(table.getNameAsString(),
118        scan,
119        mapper,
120        outputKeyClass,
121        outputValueClass,
122        job,
123        true);
124  }
125
126  /**
127   * Use this before submitting a TableMap job. It will appropriately set up
128   * the job.
129   *
130   * @param table Binary representation of the table name to read from.
131   * @param scan  The scan instance with the columns, time range etc.
132   * @param mapper  The mapper class to use.
133   * @param outputKeyClass  The class of the output key.
134   * @param outputValueClass  The class of the output value.
135   * @param job  The current job to adjust.  Make sure the passed job is
136   * carrying all necessary HBase configuration.
137   * @throws IOException When setting up the details fails.
138   */
139   public static void initTableMapperJob(byte[] table, Scan scan,
140      Class<? extends TableMapper> mapper,
141      Class<?> outputKeyClass,
142      Class<?> outputValueClass, Job job)
143  throws IOException {
144      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
145              job, true);
146  }
147
148   /**
149    * Use this before submitting a TableMap job. It will appropriately set up
150    * the job.
151    *
152    * @param table  The table name to read from.
153    * @param scan  The scan instance with the columns, time range etc.
154    * @param mapper  The mapper class to use.
155    * @param outputKeyClass  The class of the output key.
156    * @param outputValueClass  The class of the output value.
157    * @param job  The current job to adjust.  Make sure the passed job is
158    * carrying all necessary HBase configuration.
159    * @param addDependencyJars upload HBase jars and jars for any of the configured
160    *           job classes via the distributed cache (tmpjars).
161    * @throws IOException When setting up the details fails.
162    */
163   public static void initTableMapperJob(String table, Scan scan,
164       Class<? extends TableMapper> mapper,
165       Class<?> outputKeyClass,
166       Class<?> outputValueClass, Job job,
167       boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
168   throws IOException {
169     initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
170         addDependencyJars, true, inputFormatClass);
171   }
172
173
174  /**
175   * Use this before submitting a TableMap job. It will appropriately set up
176   * the job.
177   *
178   * @param table  The table name to read from.
179   * @param scan  The scan instance with the columns, time range etc.
180   * @param mapper  The mapper class to use.
181   * @param outputKeyClass  The class of the output key.
182   * @param outputValueClass  The class of the output value.
183   * @param job  The current job to adjust.  Make sure the passed job is
184   * carrying all necessary HBase configuration.
185   * @param addDependencyJars upload HBase jars and jars for any of the configured
186   *           job classes via the distributed cache (tmpjars).
187   * @param initCredentials whether to initialize hbase auth credentials for the job
188   * @param inputFormatClass the input format
189   * @throws IOException When setting up the details fails.
190   */
191  public static void initTableMapperJob(String table, Scan scan,
192      Class<? extends TableMapper> mapper,
193      Class<?> outputKeyClass,
194      Class<?> outputValueClass, Job job,
195      boolean addDependencyJars, boolean initCredentials,
196      Class<? extends InputFormat> inputFormatClass)
197  throws IOException {
198    job.setInputFormatClass(inputFormatClass);
199    if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
200    if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
201    job.setMapperClass(mapper);
202    if (Put.class.equals(outputValueClass)) {
203      job.setCombinerClass(PutCombiner.class);
204    }
205    Configuration conf = job.getConfiguration();
206    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
207    conf.set(TableInputFormat.INPUT_TABLE, table);
208    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
209    conf.setStrings("io.serializations", conf.get("io.serializations"),
210        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
211        CellSerialization.class.getName());
212    if (addDependencyJars) {
213      addDependencyJars(job);
214    }
215    if (initCredentials) {
216      initCredentials(job);
217    }
218  }
219
220  /**
221   * Use this before submitting a TableMap job. It will appropriately set up
222   * the job.
223   *
224   * @param table Binary representation of the table name to read from.
225   * @param scan  The scan instance with the columns, time range etc.
226   * @param mapper  The mapper class to use.
227   * @param outputKeyClass  The class of the output key.
228   * @param outputValueClass  The class of the output value.
229   * @param job  The current job to adjust.  Make sure the passed job is
230   * carrying all necessary HBase configuration.
231   * @param addDependencyJars upload HBase jars and jars for any of the configured
232   *           job classes via the distributed cache (tmpjars).
233   * @param inputFormatClass The class of the input format
234   * @throws IOException When setting up the details fails.
235   */
236  public static void initTableMapperJob(byte[] table, Scan scan,
237      Class<? extends TableMapper> mapper,
238      Class<?> outputKeyClass,
239      Class<?> outputValueClass, Job job,
240      boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
241  throws IOException {
242      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
243              outputValueClass, job, addDependencyJars, inputFormatClass);
244  }
245
246  /**
247   * Use this before submitting a TableMap job. It will appropriately set up
248   * the job.
249   *
250   * @param table Binary representation of the table name to read from.
251   * @param scan  The scan instance with the columns, time range etc.
252   * @param mapper  The mapper class to use.
253   * @param outputKeyClass  The class of the output key.
254   * @param outputValueClass  The class of the output value.
255   * @param job  The current job to adjust.  Make sure the passed job is
256   * carrying all necessary HBase configuration.
257   * @param addDependencyJars upload HBase jars and jars for any of the configured
258   *           job classes via the distributed cache (tmpjars).
259   * @throws IOException When setting up the details fails.
260   */
261  public static void initTableMapperJob(byte[] table, Scan scan,
262      Class<? extends TableMapper> mapper,
263      Class<?> outputKeyClass,
264      Class<?> outputValueClass, Job job,
265      boolean addDependencyJars)
266  throws IOException {
267      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
268              outputValueClass, job, addDependencyJars, TableInputFormat.class);
269  }
270
271  /**
272   * Use this before submitting a TableMap job. It will appropriately set up
273   * the job.
274   *
275   * @param table The table name to read from.
276   * @param scan  The scan instance with the columns, time range etc.
277   * @param mapper  The mapper class to use.
278   * @param outputKeyClass  The class of the output key.
279   * @param outputValueClass  The class of the output value.
280   * @param job  The current job to adjust.  Make sure the passed job is
281   * carrying all necessary HBase configuration.
282   * @param addDependencyJars upload HBase jars and jars for any of the configured
283   *           job classes via the distributed cache (tmpjars).
284   * @throws IOException When setting up the details fails.
285   */
286  public static void initTableMapperJob(String table, Scan scan,
287      Class<? extends TableMapper> mapper,
288      Class<?> outputKeyClass,
289      Class<?> outputValueClass, Job job,
290      boolean addDependencyJars)
291  throws IOException {
292      initTableMapperJob(table, scan, mapper, outputKeyClass,
293              outputValueClass, job, addDependencyJars, TableInputFormat.class);
294  }
295
296  /**
297   * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
298   * direct memory will likely cause the map tasks to OOM when opening the region. This
299   * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
300   * wants to override this behavior in their job.
301   */
302  public static void resetCacheConfig(Configuration conf) {
303    conf.setFloat(
304      HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
305    conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
306    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
307  }
308
309  /**
310   * Sets up the job for reading from one or more table snapshots, with one or more scans
311   * per snapshot.
312   * It bypasses hbase servers and read directly from snapshot files.
313   *
314   * @param snapshotScans     map of snapshot name to scans on that snapshot.
315   * @param mapper            The mapper class to use.
316   * @param outputKeyClass    The class of the output key.
317   * @param outputValueClass  The class of the output value.
318   * @param job               The current job to adjust.  Make sure the passed job is
319   *                          carrying all necessary HBase configuration.
320   * @param addDependencyJars upload HBase jars and jars for any of the configured
321   *                          job classes via the distributed cache (tmpjars).
322   */
323  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
324      Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
325      Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
326    MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);
327
328    job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
329    if (outputValueClass != null) {
330      job.setMapOutputValueClass(outputValueClass);
331    }
332    if (outputKeyClass != null) {
333      job.setMapOutputKeyClass(outputKeyClass);
334    }
335    job.setMapperClass(mapper);
336    Configuration conf = job.getConfiguration();
337    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
338
339    if (addDependencyJars) {
340      addDependencyJars(job);
341      addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class);
342    }
343
344    resetCacheConfig(job.getConfiguration());
345  }
346
347  /**
348   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
349   * from snapshot files.
350   * @param snapshotName The name of the snapshot (of a table) to read from.
351   * @param scan The scan instance with the columns, time range etc.
352   * @param mapper The mapper class to use.
353   * @param outputKeyClass The class of the output key.
354   * @param outputValueClass The class of the output value.
355   * @param job The current job to adjust. Make sure the passed job is carrying all necessary HBase
356   *          configuration.
357   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
358   *          the distributed cache (tmpjars).
359   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
360   *          have write permissions to this directory, and this should not be a subdirectory of
361   *          rootdir. After the job is finished, restore directory can be deleted.
362   * @throws IOException When setting up the details fails.
363   * @see TableSnapshotInputFormat
364   */
365  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
366      Class<? extends TableMapper> mapper,
367      Class<?> outputKeyClass,
368      Class<?> outputValueClass, Job job,
369      boolean addDependencyJars, Path tmpRestoreDir)
370      throws IOException {
371    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
372    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job,
373      addDependencyJars, false, TableSnapshotInputFormat.class);
374    resetCacheConfig(job.getConfiguration());
375  }
376
377  /**
378   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
379   * and read directly from snapshot files.
380   *
381   * @param snapshotName The name of the snapshot (of a table) to read from.
382   * @param scan  The scan instance with the columns, time range etc.
383   * @param mapper  The mapper class to use.
384   * @param outputKeyClass  The class of the output key.
385   * @param outputValueClass  The class of the output value.
386   * @param job  The current job to adjust.  Make sure the passed job is
387   * carrying all necessary HBase configuration.
388   * @param addDependencyJars upload HBase jars and jars for any of the configured
389   *           job classes via the distributed cache (tmpjars).
390   *
391   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
392   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
393   * After the job is finished, restore directory can be deleted.
394   * @param splitAlgo algorithm to split
395   * @param numSplitsPerRegion how many input splits to generate per one region
396   * @throws IOException When setting up the details fails.
397   * @see TableSnapshotInputFormat
398   */
399  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
400                                                Class<? extends TableMapper> mapper,
401                                                Class<?> outputKeyClass,
402                                                Class<?> outputValueClass, Job job,
403                                                boolean addDependencyJars, Path tmpRestoreDir,
404                                                RegionSplitter.SplitAlgorithm splitAlgo,
405                                                int numSplitsPerRegion)
406          throws IOException {
407    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo,
408            numSplitsPerRegion);
409    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
410            outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
411    resetCacheConfig(job.getConfiguration());
412  }
413
414  /**
415   * Use this before submitting a Multi TableMap job. It will appropriately set
416   * up the job.
417   *
418   * @param scans The list of {@link Scan} objects to read from.
419   * @param mapper The mapper class to use.
420   * @param outputKeyClass The class of the output key.
421   * @param outputValueClass The class of the output value.
422   * @param job The current job to adjust. Make sure the passed job is carrying
423   *          all necessary HBase configuration.
424   * @throws IOException When setting up the details fails.
425   */
426  public static void initTableMapperJob(List<Scan> scans,
427      Class<? extends TableMapper> mapper,
428      Class<?> outputKeyClass,
429      Class<?> outputValueClass, Job job) throws IOException {
430    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
431        true);
432  }
433
434  /**
435   * Use this before submitting a Multi TableMap job. It will appropriately set
436   * up the job.
437   *
438   * @param scans The list of {@link Scan} objects to read from.
439   * @param mapper The mapper class to use.
440   * @param outputKeyClass The class of the output key.
441   * @param outputValueClass The class of the output value.
442   * @param job The current job to adjust. Make sure the passed job is carrying
443   *          all necessary HBase configuration.
444   * @param addDependencyJars upload HBase jars and jars for any of the
445   *          configured job classes via the distributed cache (tmpjars).
446   * @throws IOException When setting up the details fails.
447   */
448  public static void initTableMapperJob(List<Scan> scans,
449      Class<? extends TableMapper> mapper,
450      Class<?> outputKeyClass,
451      Class<?> outputValueClass, Job job,
452      boolean addDependencyJars) throws IOException {
453    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
454      addDependencyJars, true);
455  }
456
457  /**
458   * Use this before submitting a Multi TableMap job. It will appropriately set
459   * up the job.
460   *
461   * @param scans The list of {@link Scan} objects to read from.
462   * @param mapper The mapper class to use.
463   * @param outputKeyClass The class of the output key.
464   * @param outputValueClass The class of the output value.
465   * @param job The current job to adjust. Make sure the passed job is carrying
466   *          all necessary HBase configuration.
467   * @param addDependencyJars upload HBase jars and jars for any of the
468   *          configured job classes via the distributed cache (tmpjars).
469   * @param initCredentials whether to initialize hbase auth credentials for the job
470   * @throws IOException When setting up the details fails.
471   */
472  public static void initTableMapperJob(List<Scan> scans,
473      Class<? extends TableMapper> mapper,
474      Class<?> outputKeyClass,
475      Class<?> outputValueClass, Job job,
476      boolean addDependencyJars,
477      boolean initCredentials) throws IOException {
478    job.setInputFormatClass(MultiTableInputFormat.class);
479    if (outputValueClass != null) {
480      job.setMapOutputValueClass(outputValueClass);
481    }
482    if (outputKeyClass != null) {
483      job.setMapOutputKeyClass(outputKeyClass);
484    }
485    job.setMapperClass(mapper);
486    Configuration conf = job.getConfiguration();
487    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
488    List<String> scanStrings = new ArrayList<>();
489
490    for (Scan scan : scans) {
491      scanStrings.add(convertScanToString(scan));
492    }
493    job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
494      scanStrings.toArray(new String[scanStrings.size()]));
495
496    if (addDependencyJars) {
497      addDependencyJars(job);
498    }
499
500    if (initCredentials) {
501      initCredentials(job);
502    }
503  }
504
505  public static void initCredentials(Job job) throws IOException {
506    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
507    if (userProvider.isHadoopSecurityEnabled()) {
508      // propagate delegation related props from launcher job to MR job
509      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
510        job.getConfiguration().set("mapreduce.job.credentials.binary",
511                                   System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
512      }
513    }
514
515    if (userProvider.isHBaseSecurityEnabled()) {
516      try {
517        // init credentials for remote cluster
518        String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
519        User user = userProvider.getCurrent();
520        if (quorumAddress != null) {
521          Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
522              quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
523          Connection peerConn = ConnectionFactory.createConnection(peerConf);
524          try {
525            TokenUtil.addTokenForJob(peerConn, user, job);
526          } finally {
527            peerConn.close();
528          }
529        }
530
531        Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
532        try {
533          TokenUtil.addTokenForJob(conn, user, job);
534        } finally {
535          conn.close();
536        }
537      } catch (InterruptedException ie) {
538        LOG.info("Interrupted obtaining user authentication token");
539        Thread.currentThread().interrupt();
540      }
541    }
542  }
543
544  /**
545   * Obtain an authentication token, for the specified cluster, on behalf of the current user
546   * and add it to the credentials for the given map reduce job.
547   *
548   * The quorumAddress is the key to the ZK ensemble, which contains:
549   * hbase.zookeeper.quorum, hbase.zookeeper.client.port and
550   * zookeeper.znode.parent
551   *
552   * @param job The job that requires the permission.
553   * @param quorumAddress string that contains the 3 required configuratins
554   * @throws IOException When the authentication token cannot be obtained.
555   * @deprecated Since 1.2.0 and will be removed in 3.0.0. Use
556   *   {@link #initCredentialsForCluster(Job, Configuration)} instead.
557   * @see #initCredentialsForCluster(Job, Configuration)
558   * @see <a href="https://issues.apache.org/jira/browse/HBASE-14886">HBASE-14886</a>
559   */
560  @Deprecated
561  public static void initCredentialsForCluster(Job job, String quorumAddress)
562      throws IOException {
563    Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
564        quorumAddress);
565    initCredentialsForCluster(job, peerConf);
566  }
567
568  /**
569   * Obtain an authentication token, for the specified cluster, on behalf of the current user
570   * and add it to the credentials for the given map reduce job.
571   *
572   * @param job The job that requires the permission.
573   * @param conf The configuration to use in connecting to the peer cluster
574   * @throws IOException When the authentication token cannot be obtained.
575   */
576  public static void initCredentialsForCluster(Job job, Configuration conf)
577      throws IOException {
578    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
579    if (userProvider.isHBaseSecurityEnabled()) {
580      try {
581        Connection peerConn = ConnectionFactory.createConnection(conf);
582        try {
583          TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
584        } finally {
585          peerConn.close();
586        }
587      } catch (InterruptedException e) {
588        LOG.info("Interrupted obtaining user authentication token");
589        Thread.interrupted();
590      }
591    }
592  }
593
594  /**
595   * Writes the given scan into a Base64 encoded string.
596   *
597   * @param scan  The scan to write out.
598   * @return The scan saved in a Base64 encoded string.
599   * @throws IOException When writing the scan fails.
600   */
601  public static String convertScanToString(Scan scan) throws IOException {
602    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
603    return Bytes.toString(Base64.getEncoder().encode(proto.toByteArray()));
604  }
605
606  /**
607   * Converts the given Base64 string back into a Scan instance.
608   *
609   * @param base64  The scan details.
610   * @return The newly created Scan instance.
611   * @throws IOException When reading the scan instance fails.
612   */
613  public static Scan convertStringToScan(String base64) throws IOException {
614    byte [] decoded = Base64.getDecoder().decode(base64);
615    return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded));
616  }
617
618  /**
619   * Use this before submitting a TableReduce job. It will
620   * appropriately set up the JobConf.
621   *
622   * @param table  The output table.
623   * @param reducer  The reducer class to use.
624   * @param job  The current job to adjust.
625   * @throws IOException When determining the region count fails.
626   */
627  public static void initTableReducerJob(String table,
628    Class<? extends TableReducer> reducer, Job job)
629  throws IOException {
630    initTableReducerJob(table, reducer, job, null);
631  }
632
633  /**
634   * Use this before submitting a TableReduce job. It will
635   * appropriately set up the JobConf.
636   *
637   * @param table  The output table.
638   * @param reducer  The reducer class to use.
639   * @param job  The current job to adjust.
640   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
641   * default partitioner.
642   * @throws IOException When determining the region count fails.
643   */
644  public static void initTableReducerJob(String table,
645    Class<? extends TableReducer> reducer, Job job,
646    Class partitioner) throws IOException {
647    initTableReducerJob(table, reducer, job, partitioner, null, null, null);
648  }
649
650  /**
651   * Use this before submitting a TableReduce job. It will
652   * appropriately set up the JobConf.
653   *
654   * @param table  The output table.
655   * @param reducer  The reducer class to use.
656   * @param job  The current job to adjust.  Make sure the passed job is
657   * carrying all necessary HBase configuration.
658   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
659   * default partitioner.
660   * @param quorumAddress Distant cluster to write to; default is null for
661   * output to the cluster that is designated in <code>hbase-site.xml</code>.
662   * Set this String to the zookeeper ensemble of an alternate remote cluster
663   * when you would have the reduce write a cluster that is other than the
664   * default; e.g. copying tables between clusters, the source would be
665   * designated by <code>hbase-site.xml</code> and this param would have the
666   * ensemble address of the remote cluster.  The format to pass is particular.
667   * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
668   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
669   * </code> such as <code>server,server2,server3:2181:/hbase</code>.
670   * @param serverClass redefined hbase.regionserver.class
671   * @param serverImpl redefined hbase.regionserver.impl
672   * @throws IOException When determining the region count fails.
673   */
674  public static void initTableReducerJob(String table,
675    Class<? extends TableReducer> reducer, Job job,
676    Class partitioner, String quorumAddress, String serverClass,
677    String serverImpl) throws IOException {
678    initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
679        serverClass, serverImpl, true);
680  }
681
682  /**
683   * Use this before submitting a TableReduce job. It will
684   * appropriately set up the JobConf.
685   *
686   * @param table  The output table.
687   * @param reducer  The reducer class to use.
688   * @param job  The current job to adjust.  Make sure the passed job is
689   * carrying all necessary HBase configuration.
690   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
691   * default partitioner.
692   * @param quorumAddress Distant cluster to write to; default is null for
693   * output to the cluster that is designated in <code>hbase-site.xml</code>.
694   * Set this String to the zookeeper ensemble of an alternate remote cluster
695   * when you would have the reduce write a cluster that is other than the
696   * default; e.g. copying tables between clusters, the source would be
697   * designated by <code>hbase-site.xml</code> and this param would have the
698   * ensemble address of the remote cluster.  The format to pass is particular.
699   * Pass <code> &lt;hbase.zookeeper.quorum&gt;:&lt;
700   *             hbase.zookeeper.client.port&gt;:&lt;zookeeper.znode.parent&gt;
701   * </code> such as <code>server,server2,server3:2181:/hbase</code>.
702   * @param serverClass redefined hbase.regionserver.class
703   * @param serverImpl redefined hbase.regionserver.impl
704   * @param addDependencyJars upload HBase jars and jars for any of the configured
705   *           job classes via the distributed cache (tmpjars).
706   * @throws IOException When determining the region count fails.
707   */
708  public static void initTableReducerJob(String table,
709    Class<? extends TableReducer> reducer, Job job,
710    Class partitioner, String quorumAddress, String serverClass,
711    String serverImpl, boolean addDependencyJars) throws IOException {
712
713    Configuration conf = job.getConfiguration();
714    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
715    job.setOutputFormatClass(TableOutputFormat.class);
716    if (reducer != null) job.setReducerClass(reducer);
717    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
718    conf.setStrings("io.serializations", conf.get("io.serializations"),
719        MutationSerialization.class.getName(), ResultSerialization.class.getName());
720    // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
721    if (quorumAddress != null) {
722      // Calling this will validate the format
723      ZKConfig.validateClusterKey(quorumAddress);
724      conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
725    }
726    if (serverClass != null && serverImpl != null) {
727      conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
728      conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
729    }
730    job.setOutputKeyClass(ImmutableBytesWritable.class);
731    job.setOutputValueClass(Writable.class);
732    if (partitioner == HRegionPartitioner.class) {
733      job.setPartitionerClass(HRegionPartitioner.class);
734      int regions = getRegionCount(conf, TableName.valueOf(table));
735      if (job.getNumReduceTasks() > regions) {
736        job.setNumReduceTasks(regions);
737      }
738    } else if (partitioner != null) {
739      job.setPartitionerClass(partitioner);
740    }
741
742    if (addDependencyJars) {
743      addDependencyJars(job);
744    }
745
746    initCredentials(job);
747  }
748
749  /**
750   * Ensures that the given number of reduce tasks for the given job
751   * configuration does not exceed the number of regions for the given table.
752   *
753   * @param table  The table to get the region count for.
754   * @param job  The current job to adjust.
755   * @throws IOException When retrieving the table details fails.
756   */
757  public static void limitNumReduceTasks(String table, Job job) throws IOException {
758    int regions = getRegionCount(job.getConfiguration(), TableName.valueOf(table));
759    if (job.getNumReduceTasks() > regions) {
760      job.setNumReduceTasks(regions);
761    }
762  }
763
764  /**
765   * Sets the number of reduce tasks for the given job configuration to the
766   * number of regions the given table has.
767   *
768   * @param table  The table to get the region count for.
769   * @param job  The current job to adjust.
770   * @throws IOException When retrieving the table details fails.
771   */
772  public static void setNumReduceTasks(String table, Job job) throws IOException {
773    job.setNumReduceTasks(getRegionCount(job.getConfiguration(), TableName.valueOf(table)));
774  }
775
776  /**
777   * Sets the number of rows to return and cache with each scanner iteration.
778   * Higher caching values will enable faster mapreduce jobs at the expense of
779   * requiring more heap to contain the cached rows.
780   *
781   * @param job The current job to adjust.
782   * @param batchSize The number of rows to return in batch with each scanner
783   * iteration.
784   */
785  public static void setScannerCaching(Job job, int batchSize) {
786    job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
787  }
788
789  /**
790   * Add HBase and its dependencies (only) to the job configuration.
791   * <p>
792   * This is intended as a low-level API, facilitating code reuse between this
793   * class and its mapred counterpart. It also of use to external tools that
794   * need to build a MapReduce job that interacts with HBase but want
795   * fine-grained control over the jars shipped to the cluster.
796   * </p>
797   * @param conf The Configuration object to extend with dependencies.
798   * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
799   * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
800   */
801  public static void addHBaseDependencyJars(Configuration conf) throws IOException {
802    addDependencyJarsForClasses(conf,
803      // explicitly pull a class from each module
804      org.apache.hadoop.hbase.HConstants.class,                      // hbase-common
805      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol
806      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded
807      org.apache.hadoop.hbase.client.Put.class,                      // hbase-client
808      org.apache.hadoop.hbase.ipc.RpcServer.class,                   // hbase-server
809      org.apache.hadoop.hbase.CompatibilityFactory.class,            // hbase-hadoop-compat
810      org.apache.hadoop.hbase.mapreduce.JobUtil.class,               // hbase-hadoop2-compat
811      org.apache.hadoop.hbase.mapreduce.TableMapper.class,           // hbase-mapreduce
812      org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class,  // hbase-metrics
813      org.apache.hadoop.hbase.metrics.Snapshot.class,                // hbase-metrics-api
814      org.apache.hadoop.hbase.replication.ReplicationUtils.class,    // hbase-replication
815      org.apache.hadoop.hbase.http.HttpServer.class,                 // hbase-http
816      org.apache.hadoop.hbase.procedure2.Procedure.class,            // hbase-procedure
817      org.apache.hadoop.hbase.zookeeper.ZKWatcher.class,             // hbase-zookeeper
818      org.apache.hbase.thirdparty.com.google.common.collect.Lists.class, // hb-shaded-miscellaneous
819      org.apache.hbase.thirdparty.com.google.gson.GsonBuilder.class, // hbase-shaded-gson
820      org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.class, // hb-sh-protobuf
821      org.apache.hbase.thirdparty.io.netty.channel.Channel.class,    // hbase-shaded-netty
822      org.apache.zookeeper.ZooKeeper.class,                          // zookeeper
823      com.google.protobuf.Message.class,                             // protobuf
824      org.apache.htrace.core.Tracer.class,                           // htrace
825      com.codahale.metrics.MetricRegistry.class,                     // metrics-core
826      org.apache.commons.lang3.ArrayUtils.class);                    // commons-lang
827  }
828
829  /**
830   * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}.
831   * Also exposed to shell scripts via `bin/hbase mapredcp`.
832   */
833  public static String buildDependencyClasspath(Configuration conf) {
834    if (conf == null) {
835      throw new IllegalArgumentException("Must provide a configuration object.");
836    }
837    Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars"));
838    if (paths.isEmpty()) {
839      throw new IllegalArgumentException("Configuration contains no tmpjars.");
840    }
841    StringBuilder sb = new StringBuilder();
842    for (String s : paths) {
843      // entries can take the form 'file:/path/to/file.jar'.
844      int idx = s.indexOf(":");
845      if (idx != -1) s = s.substring(idx + 1);
846      if (sb.length() > 0) sb.append(File.pathSeparator);
847      sb.append(s);
848    }
849    return sb.toString();
850  }
851
852  /**
853   * Add the HBase dependency jars as well as jars for any of the configured
854   * job classes to the job configuration, so that JobClient will ship them
855   * to the cluster and add them to the DistributedCache.
856   */
857  public static void addDependencyJars(Job job) throws IOException {
858    addHBaseDependencyJars(job.getConfiguration());
859    try {
860      addDependencyJarsForClasses(job.getConfiguration(),
861          // when making changes here, consider also mapred.TableMapReduceUtil
862          // pull job classes
863          job.getMapOutputKeyClass(),
864          job.getMapOutputValueClass(),
865          job.getInputFormatClass(),
866          job.getOutputKeyClass(),
867          job.getOutputValueClass(),
868          job.getOutputFormatClass(),
869          job.getPartitionerClass(),
870          job.getCombinerClass());
871    } catch (ClassNotFoundException e) {
872      throw new IOException(e);
873    }
874  }
875
876  /**
877   * Add the jars containing the given classes to the job's configuration
878   * such that JobClient will ship them to the cluster and add them to
879   * the DistributedCache.
880   * @deprecated since 1.3.0 and will be removed in 3.0.0. Use {@link #addDependencyJars(Job)}
881   *   instead.
882   * @see #addDependencyJars(Job)
883   * @see <a href="https://issues.apache.org/jira/browse/HBASE-8386">HBASE-8386</a>
884   */
885  @Deprecated
886  public static void addDependencyJars(Configuration conf,
887      Class<?>... classes) throws IOException {
888    LOG.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it"
889             + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) " +
890             "instead. See HBASE-8386 for more details.");
891    addDependencyJarsForClasses(conf, classes);
892  }
893
894  /**
895   * Add the jars containing the given classes to the job's configuration
896   * such that JobClient will ship them to the cluster and add them to
897   * the DistributedCache.
898   *
899   * N.B. that this method at most adds one jar per class given. If there is more than one
900   * jar available containing a class with the same name as a given class, we don't define
901   * which of those jars might be chosen.
902   *
903   * @param conf The Hadoop Configuration to modify
904   * @param classes will add just those dependencies needed to find the given classes
905   * @throws IOException if an underlying library call fails.
906   */
907  @InterfaceAudience.Private
908  public static void addDependencyJarsForClasses(Configuration conf,
909      Class<?>... classes) throws IOException {
910
911    FileSystem localFs = FileSystem.getLocal(conf);
912    Set<String> jars = new HashSet<>();
913    // Add jars that are already in the tmpjars variable
914    jars.addAll(conf.getStringCollection("tmpjars"));
915
916    // add jars as we find them to a map of contents jar name so that we can avoid
917    // creating new jars for classes that have already been packaged.
918    Map<String, String> packagedClasses = new HashMap<>();
919
920    // Add jars containing the specified classes
921    for (Class<?> clazz : classes) {
922      if (clazz == null) continue;
923
924      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
925      if (path == null) {
926        LOG.warn("Could not find jar for class " + clazz +
927                 " in order to ship it to the cluster.");
928        continue;
929      }
930      if (!localFs.exists(path)) {
931        LOG.warn("Could not validate jar file " + path + " for class "
932                 + clazz);
933        continue;
934      }
935      jars.add(path.toString());
936    }
937    if (jars.isEmpty()) return;
938
939    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
940  }
941
942  /**
943   * Finds the Jar for a class or creates it if it doesn't exist. If the class is in
944   * a directory in the classpath, it creates a Jar on the fly with the
945   * contents of the directory and returns the path to that Jar. If a Jar is
946   * created, it is created in the system temporary directory. Otherwise,
947   * returns an existing jar that contains a class of the same name. Maintains
948   * a mapping from jar contents to the tmp jar created.
949   * @param my_class the class to find.
950   * @param fs the FileSystem with which to qualify the returned path.
951   * @param packagedClasses a map of class name to path.
952   * @return a jar file that contains the class.
953   * @throws IOException
954   */
955  private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
956      Map<String, String> packagedClasses)
957  throws IOException {
958    // attempt to locate an existing jar for the class.
959    String jar = findContainingJar(my_class, packagedClasses);
960    if (null == jar || jar.isEmpty()) {
961      jar = getJar(my_class);
962      updateMap(jar, packagedClasses);
963    }
964
965    if (null == jar || jar.isEmpty()) {
966      return null;
967    }
968
969    LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
970    return new Path(jar).makeQualified(fs.getUri(), fs.getWorkingDirectory());
971  }
972
973  /**
974   * Add entries to <code>packagedClasses</code> corresponding to class files
975   * contained in <code>jar</code>.
976   * @param jar The jar who's content to list.
977   * @param packagedClasses map[class -> jar]
978   */
979  private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
980    if (null == jar || jar.isEmpty()) {
981      return;
982    }
983    ZipFile zip = null;
984    try {
985      zip = new ZipFile(jar);
986      for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
987        ZipEntry entry = iter.nextElement();
988        if (entry.getName().endsWith("class")) {
989          packagedClasses.put(entry.getName(), jar);
990        }
991      }
992    } finally {
993      if (null != zip) zip.close();
994    }
995  }
996
997  /**
998   * Find a jar that contains a class of the same name, if any. It will return
999   * a jar file, even if that is not the first thing on the class path that
1000   * has a class with the same name. Looks first on the classpath and then in
1001   * the <code>packagedClasses</code> map.
1002   * @param my_class the class to find.
1003   * @return a jar file that contains the class, or null.
1004   * @throws IOException
1005   */
1006  private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
1007      throws IOException {
1008    ClassLoader loader = my_class.getClassLoader();
1009
1010    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
1011
1012    if (loader != null) {
1013      // first search the classpath
1014      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
1015        URL url = itr.nextElement();
1016        if ("jar".equals(url.getProtocol())) {
1017          String toReturn = url.getPath();
1018          if (toReturn.startsWith("file:")) {
1019            toReturn = toReturn.substring("file:".length());
1020          }
1021          // URLDecoder is a misnamed class, since it actually decodes
1022          // x-www-form-urlencoded MIME type rather than actual
1023          // URL encoding (which the file path has). Therefore it would
1024          // decode +s to ' 's which is incorrect (spaces are actually
1025          // either unencoded or encoded as "%20"). Replace +s first, so
1026          // that they are kept sacred during the decoding process.
1027          toReturn = toReturn.replaceAll("\\+", "%2B");
1028          toReturn = URLDecoder.decode(toReturn, "UTF-8");
1029          return toReturn.replaceAll("!.*$", "");
1030        }
1031      }
1032    }
1033
1034    // now look in any jars we've packaged using JarFinder. Returns null when
1035    // no jar is found.
1036    return packagedClasses.get(class_file);
1037  }
1038
1039  /**
1040   * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job
1041   * configuration contexts (HBASE-8140) and also for testing on MRv2.
1042   * check if we have HADOOP-9426.
1043   * @param my_class the class to find.
1044   * @return a jar file that contains the class, or null.
1045   */
1046  private static String getJar(Class<?> my_class) {
1047    String ret = null;
1048    try {
1049      ret = JarFinder.getJar(my_class);
1050    } catch (Exception e) {
1051      // toss all other exceptions, related to reflection failure
1052      throw new RuntimeException("getJar invocation failed.", e);
1053    }
1054
1055    return ret;
1056  }
1057
1058  private static int getRegionCount(Configuration conf, TableName tableName) throws IOException {
1059    try (Connection conn = ConnectionFactory.createConnection(conf);
1060      RegionLocator locator = conn.getRegionLocator(tableName)) {
1061      return locator.getAllRegionLocations().size();
1062    }
1063  }
1064}