001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.Map;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.HBaseConfiguration;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.Connection;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.client.Put;
030import org.apache.hadoop.hbase.client.RegionLocator;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
033import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
034import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
035import org.apache.hadoop.hbase.security.User;
036import org.apache.hadoop.hbase.security.UserProvider;
037import org.apache.hadoop.hbase.security.token.TokenUtil;
038import org.apache.hadoop.hbase.util.RegionSplitter;
039import org.apache.hadoop.mapred.FileInputFormat;
040import org.apache.hadoop.mapred.InputFormat;
041import org.apache.hadoop.mapred.JobConf;
042import org.apache.hadoop.mapred.OutputFormat;
043import org.apache.hadoop.mapred.TextInputFormat;
044import org.apache.hadoop.mapred.TextOutputFormat;
045import org.apache.yetus.audience.InterfaceAudience;
046
047/**
048 * Utility for {@link TableMap} and {@link TableReduce}
049 */
050@InterfaceAudience.Public
051@SuppressWarnings({ "rawtypes", "unchecked" })
052public class TableMapReduceUtil {
053
054  /**
055   * Use this before submitting a TableMap job. It will appropriately set up the JobConf.
056   * @param table            The table name to read from.
057   * @param columns          The columns to scan.
058   * @param mapper           The mapper class to use.
059   * @param outputKeyClass   The class of the output key.
060   * @param outputValueClass The class of the output value.
061   * @param job              The current job configuration to adjust.
062   */
063  public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper,
064    Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job) {
065    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, true,
066      TableInputFormat.class);
067  }
068
069  public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper,
070    Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
071    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
072      addDependencyJars, TableInputFormat.class);
073  }
074
075  /**
076   * Use this before submitting a TableMap job. It will appropriately set up the JobConf.
077   * @param table             The table name to read from.
078   * @param columns           The columns to scan.
079   * @param mapper            The mapper class to use.
080   * @param outputKeyClass    The class of the output key.
081   * @param outputValueClass  The class of the output value.
082   * @param job               The current job configuration to adjust.
083   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
084   *                          the distributed cache (tmpjars).
085   */
086  public static void initTableMapJob(String table, String columns, Class<? extends TableMap> mapper,
087    Class<?> outputKeyClass, Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
088    Class<? extends InputFormat> inputFormat) {
089
090    job.setInputFormat(inputFormat);
091    job.setMapOutputValueClass(outputValueClass);
092    job.setMapOutputKeyClass(outputKeyClass);
093    job.setMapperClass(mapper);
094    job.setStrings("io.serializations", job.get("io.serializations"),
095      MutationSerialization.class.getName(), ResultSerialization.class.getName());
096    FileInputFormat.addInputPaths(job, table);
097    job.set(TableInputFormat.COLUMN_LIST, columns);
098    if (addDependencyJars) {
099      try {
100        addDependencyJars(job);
101      } catch (IOException e) {
102        e.printStackTrace();
103      }
104    }
105    try {
106      initCredentials(job);
107    } catch (IOException ioe) {
108      // just spit out the stack trace? really?
109      ioe.printStackTrace();
110    }
111  }
112
113  /**
114   * Sets up the job for reading from one or more multiple table snapshots, with one or more scans
115   * per snapshot. It bypasses hbase servers and read directly from snapshot files.
116   * @param snapshotScans     map of snapshot name to scans on that snapshot.
117   * @param mapper            The mapper class to use.
118   * @param outputKeyClass    The class of the output key.
119   * @param outputValueClass  The class of the output value.
120   * @param job               The current job to adjust. Make sure the passed job is carrying all
121   *                          necessary HBase configuration.
122   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
123   *                          the distributed cache (tmpjars).
124   */
125  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
126    Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
127    JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
128    MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
129
130    job.setInputFormat(MultiTableSnapshotInputFormat.class);
131    if (outputValueClass != null) {
132      job.setMapOutputValueClass(outputValueClass);
133    }
134    if (outputKeyClass != null) {
135      job.setMapOutputKeyClass(outputKeyClass);
136    }
137    job.setMapperClass(mapper);
138    if (addDependencyJars) {
139      addDependencyJars(job);
140    }
141
142    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
143  }
144
145  /**
146   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
147   * from snapshot files.
148   * @param snapshotName      The name of the snapshot (of a table) to read from.
149   * @param columns           The columns to scan.
150   * @param mapper            The mapper class to use.
151   * @param outputKeyClass    The class of the output key.
152   * @param outputValueClass  The class of the output value.
153   * @param job               The current job to adjust. Make sure the passed job is carrying all
154   *                          necessary HBase configuration.
155   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
156   *                          the distributed cache (tmpjars).
157   * @param tmpRestoreDir     a temporary directory to copy the snapshot files into. Current user
158   *                          should have write permissions to this directory, and this should not
159   *                          be a subdirectory of rootdir. After the job is finished, restore
160   *                          directory can be deleted.
161   * @throws IOException When setting up the details fails.
162   * @see TableSnapshotInputFormat
163   */
164  public static void initTableSnapshotMapJob(String snapshotName, String columns,
165    Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
166    JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
167    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
168    initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
169      addDependencyJars, TableSnapshotInputFormat.class);
170    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
171  }
172
173  /**
174   * Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
175   * from snapshot files.
176   * @param snapshotName       The name of the snapshot (of a table) to read from.
177   * @param columns            The columns to scan.
178   * @param mapper             The mapper class to use.
179   * @param outputKeyClass     The class of the output key.
180   * @param outputValueClass   The class of the output value.
181   * @param jobConf            The current job to adjust. Make sure the passed job is carrying all
182   *                           necessary HBase configuration.
183   * @param addDependencyJars  upload HBase jars and jars for any of the configured job classes via
184   *                           the distributed cache (tmpjars).
185   * @param tmpRestoreDir      a temporary directory to copy the snapshot files into. Current user
186   *                           should have write permissions to this directory, and this should not
187   *                           be a subdirectory of rootdir. After the job is finished, restore
188   *                           directory can be deleted.
189   * @param splitAlgo          algorithm to split
190   * @param numSplitsPerRegion how many input splits to generate per one region
191   * @throws IOException When setting up the details fails.
192   * @see TableSnapshotInputFormat
193   */
194  public static void initTableSnapshotMapJob(String snapshotName, String columns,
195    Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
196    JobConf jobConf, boolean addDependencyJars, Path tmpRestoreDir,
197    RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
198    TableSnapshotInputFormat.setInput(jobConf, snapshotName, tmpRestoreDir, splitAlgo,
199      numSplitsPerRegion);
200    initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, jobConf,
201      addDependencyJars, TableSnapshotInputFormat.class);
202    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(jobConf);
203  }
204
205  /**
206   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
207   * @param table   The output table.
208   * @param reducer The reducer class to use.
209   * @param job     The current job configuration to adjust.
210   * @throws IOException When determining the region count fails.
211   */
212  public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer,
213    JobConf job) throws IOException {
214    initTableReduceJob(table, reducer, job, null);
215  }
216
217  /**
218   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
219   * @param table       The output table.
220   * @param reducer     The reducer class to use.
221   * @param job         The current job configuration to adjust.
222   * @param partitioner Partitioner to use. Pass <code>null</code> to use default partitioner.
223   * @throws IOException When determining the region count fails.
224   */
225  public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer,
226    JobConf job, Class partitioner) throws IOException {
227    initTableReduceJob(table, reducer, job, partitioner, true);
228  }
229
230  /**
231   * Use this before submitting a TableReduce job. It will appropriately set up the JobConf.
232   * @param table             The output table.
233   * @param reducer           The reducer class to use.
234   * @param job               The current job configuration to adjust.
235   * @param partitioner       Partitioner to use. Pass <code>null</code> to use default partitioner.
236   * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
237   *                          the distributed cache (tmpjars).
238   * @throws IOException When determining the region count fails.
239   */
240  public static void initTableReduceJob(String table, Class<? extends TableReduce> reducer,
241    JobConf job, Class partitioner, boolean addDependencyJars) throws IOException {
242    job.setOutputFormat(TableOutputFormat.class);
243    job.setReducerClass(reducer);
244    job.set(TableOutputFormat.OUTPUT_TABLE, table);
245    job.setOutputKeyClass(ImmutableBytesWritable.class);
246    job.setOutputValueClass(Put.class);
247    job.setStrings("io.serializations", job.get("io.serializations"),
248      MutationSerialization.class.getName(), ResultSerialization.class.getName());
249    if (partitioner == HRegionPartitioner.class) {
250      job.setPartitionerClass(HRegionPartitioner.class);
251      int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
252      if (job.getNumReduceTasks() > regions) {
253        job.setNumReduceTasks(regions);
254      }
255    } else if (partitioner != null) {
256      job.setPartitionerClass(partitioner);
257    }
258    if (addDependencyJars) {
259      addDependencyJars(job);
260    }
261    initCredentials(job);
262  }
263
264  public static void initCredentials(JobConf job) throws IOException {
265    UserProvider userProvider = UserProvider.instantiate(job);
266    if (userProvider.isHadoopSecurityEnabled()) {
267      // propagate delegation related props from launcher job to MR job
268      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
269        job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
270      }
271    }
272
273    if (userProvider.isHBaseSecurityEnabled()) {
274      Connection conn = ConnectionFactory.createConnection(job);
275      try {
276        // login the server principal (if using secure Hadoop)
277        User user = userProvider.getCurrent();
278        TokenUtil.addTokenForJob(conn, job, user);
279      } catch (InterruptedException ie) {
280        ie.printStackTrace();
281        Thread.currentThread().interrupt();
282      } finally {
283        conn.close();
284      }
285    }
286  }
287
288  /**
289   * Ensures that the given number of reduce tasks for the given job configuration does not exceed
290   * the number of regions for the given table.
291   * @param table The table to get the region count for.
292   * @param job   The current job configuration to adjust.
293   * @throws IOException When retrieving the table details fails.
294   */
295  // Used by tests.
296  public static void limitNumReduceTasks(String table, JobConf job) throws IOException {
297    int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
298    if (job.getNumReduceTasks() > regions) {
299      job.setNumReduceTasks(regions);
300    }
301  }
302
303  /**
304   * Ensures that the given number of map tasks for the given job configuration does not exceed the
305   * number of regions for the given table.
306   * @param table The table to get the region count for.
307   * @param job   The current job configuration to adjust.
308   * @throws IOException When retrieving the table details fails.
309   */
310  // Used by tests.
311  public static void limitNumMapTasks(String table, JobConf job) throws IOException {
312    int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
313    if (job.getNumMapTasks() > regions) {
314      job.setNumMapTasks(regions);
315    }
316  }
317
318  /**
319   * Sets the number of reduce tasks for the given job configuration to the number of regions the
320   * given table has.
321   * @param table The table to get the region count for.
322   * @param job   The current job configuration to adjust.
323   * @throws IOException When retrieving the table details fails.
324   */
325  public static void setNumReduceTasks(String table, JobConf job) throws IOException {
326    job.setNumReduceTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)));
327  }
328
329  /**
330   * Sets the number of map tasks for the given job configuration to the number of regions the given
331   * table has.
332   * @param table The table to get the region count for.
333   * @param job   The current job configuration to adjust.
334   * @throws IOException When retrieving the table details fails.
335   */
336  public static void setNumMapTasks(String table, JobConf job) throws IOException {
337    job.setNumMapTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)));
338  }
339
340  /**
341   * Sets the number of rows to return and cache with each scanner iteration. Higher caching values
342   * will enable faster mapreduce jobs at the expense of requiring more heap to contain the cached
343   * rows.
344   * @param job       The current job configuration to adjust.
345   * @param batchSize The number of rows to return in batch with each scanner iteration.
346   */
347  public static void setScannerCaching(JobConf job, int batchSize) {
348    job.setInt("hbase.client.scanner.caching", batchSize);
349  }
350
351  /**
352   * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
353   */
354  public static void addDependencyJars(JobConf job) throws IOException {
355    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
356    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(job,
357      job.getMapOutputKeyClass(), job.getMapOutputValueClass(), job.getOutputKeyClass(),
358      job.getOutputValueClass(), job.getPartitionerClass(),
359      job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
360      job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
361      job.getCombinerClass());
362  }
363
364  private static int getRegionCount(Configuration conf, TableName tableName) throws IOException {
365    try (Connection conn = ConnectionFactory.createConnection(conf);
366      RegionLocator locator = conn.getRegionLocator(tableName)) {
367      return locator.getAllRegionLocations().size();
368    }
369  }
370}