001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.Map;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.HBaseConfiguration;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.Connection;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.client.Put;
030import org.apache.hadoop.hbase.client.RegionLocator;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
033import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
034import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.security.UserProvider;
037import org.apache.hadoop.hbase.security.token.TokenUtil;
038import org.apache.hadoop.hbase.util.RegionSplitter;
039import org.apache.hadoop.mapred.FileInputFormat;
040import org.apache.hadoop.mapred.InputFormat;
041import org.apache.hadoop.mapred.JobConf;
042import org.apache.hadoop.mapred.OutputFormat;
043import org.apache.hadoop.mapred.TextInputFormat;
044import org.apache.hadoop.mapred.TextOutputFormat;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Utility for {@link TableMap} and {@link TableReduce}
051 */
052@InterfaceAudience.Public
053@SuppressWarnings({ "rawtypes", "unchecked" })
054public class TableMapReduceUtil {
055  private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class);
056
057  /**
058   * Use this before submitting a TableMap job. It will appropriately set up the JobConf.
059   * @param table            The table name to read from.
060   * @param columns          The columns to scan.
061   * @param mapper           The mapper class to use.
062   * @param outputKeyClass   The class of the output key.
063   * @param outputValueClass The class of the output value.
064   * @param job              The current job configuration to adjust.
065   */
066  public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper,
067    Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job) {
068    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, true,
069      TableInputFormat.class);
070  }
071
072  public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper,
073    Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
074    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
075      addDependencyJars, TableInputFormat.class);
076  }
077
078  /**
079   * Use this before submitting a TableMap job. It will appropriately set up the JobConf.
080   * @param table             The table name to read from.
081   * @param columns           The columns to scan.
082   * @param mapper            The mapper class to use.
083   * @param outputKeyClass    The class of the output key.
084   * @param outputValueClass  The class of the output value.
085   * @param job               The current job configuration to adjust.
086   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
087   *                          the distributed cache (tmpjars).
088   */
089  public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper,
090    Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
091    Class<? extends InputFormat> inputFormat) {
092
093    job.setInputFormat(inputFormat);
094    job.setMapOutputValueClass(outputValueClass);
095    job.setMapOutputKeyClass(outputKeyClass);
096    job.setMapperClass(mapper);
097    job.setStrings("io.serializations", job.get("io.serializations"),
098      MutationSerialization.class.getName(), ResultSerialization.class.getName());
099    FileInputFormat.addInputPaths(job, table);
100    job.set(TableInputFormat.COLUMN_LIST, columns);
101    if (addDependencyJars) {
102      try {
103        addDependencyJars(job);
104      } catch (IOException e) {
105        LOG.error("IOException encountered while adding dependency jars", e);
106      }
107    }
108    try {
109      initCredentials(job);
110    } catch (IOException ioe) {
111      // just spit out the stack trace? really?
112      LOG.error("IOException encountered while initializing credentials", ioe);
113    }
114  }
115
116  /**
117   * Sets up the job for reading from one or more multiple table snapshots, with one or more scans
118   * per snapshot. It bypasses hbase servers and read directly from snapshot files.
119   * @param snapshotScans     map of snapshot name to scans on that snapshot.
120   * @param mapper            The mapper class to use.
121   * @param outputKeyClass    The class of the output key.
122   * @param outputValueClass  The class of the output value.
123   * @param job               The current job to adjust. Make sure the passed job is carrying all
124   *                          necessary HBase configuration.
125   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
126   *                          the distributed cache (tmpjars).
127   */
128  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
129    Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
130    JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
131    MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
132
133    job.setInputFormat(MultiTableSnapshotInputFormat.class);
134    if (outputValueClass != null) {
135      job.setMapOutputValueClass(outputValueClass);
136    }
137    if (outputKeyClass != null) {
138      job.setMapOutputKeyClass(outputKeyClass);
139    }
140    job.setMapperClass(mapper);
141    if (addDependencyJars) {
142      addDependencyJars(job);
143    }
144
145    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
146  }
147
148  /**
149   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
150   * from snapshot files.
151   * @param snapshotName      The name of the snapshot (of a table) to read from.
152   * @param columns           The columns to scan.
153   * @param mapper            The mapper class to use.
154   * @param outputKeyClass    The class of the output key.
155   * @param outputValueClass  The class of the output value.
156   * @param job               The current job to adjust. Make sure the passed job is carrying all
157   *                          necessary HBase configuration.
158   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
159   *                          the distributed cache (tmpjars).
160   * @param tmpRestoreDir     a temporary directory to copy the snapshot files into. Current user
161   *                          should have write permissions to this directory, and this should not
162   *                          be a subdirectory of rootdir. After the job is finished, restore
163   *                          directory can be deleted.
164   * @throws IOException When setting up the details fails.
165   * @see TableSnapshotInputFormat
166   */
167  public static void initTableSnapshotMapJob(String snapshotName, String columns,
168    Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
169    JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
170    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
171    initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
172      addDependencyJars, TableSnapshotInputFormat.class);
173    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
174  }
175
176  /**
177   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
178   * from snapshot files.
179   * @param snapshotName       The name of the snapshot (of a table) to read from.
180   * @param columns            The columns to scan.
181   * @param mapper             The mapper class to use.
182   * @param outputKeyClass     The class of the output key.
183   * @param outputValueClass   The class of the output value.
184   * @param jobConf            The current job to adjust. Make sure the passed job is carrying all
185   *                           necessary HBase configuration.
186   * @param addDependencyJars  upload HBase jars and jars for any of the configured job classes via
187   *                           the distributed cache (tmpjars).
188   * @param tmpRestoreDir      a temporary directory to copy the snapshot files into. Current user
189   *                           should have write permissions to this directory, and this should not
190   *                           be a subdirectory of rootdir. After the job is finished, restore
191   *                           directory can be deleted.
192   * @param splitAlgo          algorithm to split
193   * @param numSplitsPerRegion how many input splits to generate per one region
194   * @throws IOException When setting up the details fails.
195   * @see TableSnapshotInputFormat
196   */
197  public static void initTableSnapshotMapJob(String snapshotName, String columns,
198    Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
199    JobConf jobConf, boolean addDependencyJars, Path tmpRestoreDir,
200    RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
201    TableSnapshotInputFormat.setInput(jobConf, snapshotName, tmpRestoreDir, splitAlgo,
202      numSplitsPerRegion);
203    initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, jobConf,
204      addDependencyJars, TableSnapshotInputFormat.class);
205    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(jobConf);
206  }
207
208  /**
209   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
210   * @param table   The output table.
211   * @param reducer The reducer class to use.
212   * @param job     The current job configuration to adjust.
213   * @throws IOException When determining the region count fails.
214   */
215  public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer,
216    JobConf job) throws IOException {
217    initTableReduceJob(table, reducer, job, null);
218  }
219
220  /**
221   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
222   * @param table       The output table.
223   * @param reducer     The reducer class to use.
224   * @param job         The current job configuration to adjust.
225   * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner.
226   * @throws IOException When determining the region count fails.
227   */
228  public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer,
229    JobConf job, Class partitioner) throws IOException {
230    initTableReduceJob(table, reducer, job, partitioner, true);
231  }
232
233  /**
234   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
235   * @param table             The output table.
236   * @param reducer           The reducer class to use.
237   * @param job               The current job configuration to adjust.
238   * @param partitioner       Partitioner to use. Pass <code>null</code> to use default partitioner.
239   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
240   *                          the distributed cache (tmpjars).
241   * @throws IOException When determining the region count fails.
242   */
243  public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer,
244    JobConf job, Class partitioner, boolean addDependencyJars) throws IOException {
245    job.setOutputFormat(TableOutputFormat.class);
246    job.setReducerClass(reducer);
247    job.set(TableOutputFormat.OUTPUT_TABLE, table);
248    job.setOutputKeyClass(ImmutableBytesWritable.class);
249    job.setOutputValueClass(Put.class);
250    job.setStrings("io.serializations", job.get("io.serializations"),
251      MutationSerialization.class.getName(), ResultSerialization.class.getName());
252    if (partitioner == HRegionPartitioner.class) {
253      job.setPartitionerClass(HRegionPartitioner.class);
254      int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
255      if (job.getNumReduceTasks() > regions) {
256        job.setNumReduceTasks(regions);
257      }
258    } else if (partitioner != null) {
259      job.setPartitionerClass(partitioner);
260    }
261    if (addDependencyJars) {
262      addDependencyJars(job);
263    }
264    initCredentials(job);
265  }
266
267  public static void initCredentials(JobConf job) throws IOException {
268    UserProvider userProvider = UserProvider.instantiate(job);
269    if (userProvider.isHadoopSecurityEnabled()) {
270      // propagate delegation related props from launcher job to MR job
271      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
272        job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
273      }
274    }
275
276    if (userProvider.isHBaseSecurityEnabled()) {
277      Connection conn = ConnectionFactory.createConnection(job);
278      try {
279        // login the server principal (if using secure Hadoop)
280        User user = userProvider.getCurrent();
281        TokenUtil.addTokenForJob(conn, job, user);
282      } catch (InterruptedException ie) {
283        LOG.error("Interrupted obtaining user authentication token", ie);
284        Thread.currentThread().interrupt();
285      } finally {
286        conn.close();
287      }
288    }
289  }
290
291  /**
292   * Ensures that the given number of reduce tasks for the given job configuration does not exceed
293   * the number of regions for the given table.
294   * @param table The table to get the region count for.
295   * @param job   The current job configuration to adjust.
296   * @throws IOException When retrieving the table details fails.
297   */
298  // Used by tests.
299  public static void limitNumReduceTasks(String table, JobConf job) throws IOException {
300    int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
301    if (job.getNumReduceTasks() > regions) {
302      job.setNumReduceTasks(regions);
303    }
304  }
305
306  /**
307   * Ensures that the given number of map tasks for the given job configuration does not exceed the
308   * number of regions for the given table.
309   * @param table The table to get the region count for.
310   * @param job   The current job configuration to adjust.
311   * @throws IOException When retrieving the table details fails.
312   */
313  // Used by tests.
314  public static void limitNumMapTasks(String table, JobConf job) throws IOException {
315    int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
316    if (job.getNumMapTasks() > regions) {
317      job.setNumMapTasks(regions);
318    }
319  }
320
321  /**
322   * Sets the number of reduce tasks for the given job configuration to the number of regions the
323   * given table has.
324   * @param table The table to get the region count for.
325   * @param job   The current job configuration to adjust.
326   * @throws IOException When retrieving the table details fails.
327   */
328  public static void setNumReduceTasks(String table, JobConf job) throws IOException {
329    job.setNumReduceTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)));
330  }
331
332  /**
333   * Sets the number of map tasks for the given job configuration to the number of regions the given
334   * table has.
335   * @param table The table to get the region count for.
336   * @param job   The current job configuration to adjust.
337   * @throws IOException When retrieving the table details fails.
338   */
339  public static void setNumMapTasks(String table, JobConf job) throws IOException {
340    job.setNumMapTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)));
341  }
342
343  /**
344   * Sets the number of rows to return and cache with each scanner iteration. Higher caching values
345   * will enable faster mapreduce jobs at the expense of requiring more heap to contain the cached
346   * rows.
347   * @param job       The current job configuration to adjust.
348   * @param batchSize The number of rows to return in batch with each scanner iteration.
349   */
350  public static void setScannerCaching(JobConf job, int batchSize) {
351    job.setInt("hbase.client.scanner.caching", batchSize);
352  }
353
354  /**
355   * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
356   */
357  public static void addDependencyJars(JobConf job) throws IOException {
358    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
359    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(job,
360      job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job.getOutputKeyClass(),
361      job.getOutputValueClass(), job.getPartitionerClass(),
362      job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
363      job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
364      job.getCombinerClass());
365  }
366
367  private static int getRegionCount(Configuration conf, TableName tableName) throws IOException {
368    try (Connection conn = ConnectionFactory.createConnection(conf);
369      RegionLocator locator = conn.getRegionLocator(tableName)) {
370      return locator.getAllRegionLocations().size();
371    }
372  }
373}