001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapred;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.hbase.HBaseConfiguration;
024import org.apache.hadoop.hbase.TableName;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.client.ConnectionFactory;
028import org.apache.hadoop.hbase.client.Put;
029import org.apache.hadoop.hbase.client.RegionLocator;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
033import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
034import org.apache.hadoop.hbase.security.User;
035import org.apache.hadoop.hbase.security.UserProvider;
036import org.apache.hadoop.hbase.security.token.TokenUtil;
037import org.apache.hadoop.hbase.util.RegionSplitter;
038import org.apache.hadoop.mapred.FileInputFormat;
039import org.apache.hadoop.mapred.InputFormat;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapred.OutputFormat;
042import org.apache.hadoop.mapred.TextInputFormat;
043import org.apache.hadoop.mapred.TextOutputFormat;
044
045import java.io.IOException;
046import java.util.Collection;
047import java.util.Map;
048
049/**
050 * Utility for {@link TableMap} and {@link TableReduce}
051 */
052@InterfaceAudience.Public
053@SuppressWarnings({ "rawtypes", "unchecked" })
054public class TableMapReduceUtil {
055
056  /**
057   * Use this before submitting a TableMap job. It will
058   * appropriately set up the JobConf.
059   *
060   * @param table  The table name to read from.
061   * @param columns  The columns to scan.
062   * @param mapper  The mapper class to use.
063   * @param outputKeyClass  The class of the output key.
064   * @param outputValueClass  The class of the output value.
065   * @param job  The current job configuration to adjust.
066   */
067  public static void initTableMapJob(String table, String columns,
068    Class<? extends TableMap> mapper,
069    Class<?> outputKeyClass,
070    Class<?> outputValueClass, JobConf job) {
071    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
072      true, TableInputFormat.class);
073  }
074
075  public static void initTableMapJob(String table, String columns,
076    Class<? extends TableMap> mapper,
077    Class<?> outputKeyClass,
078    Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
079    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
080      addDependencyJars, TableInputFormat.class);
081  }
082
083  /**
084   * Use this before submitting a TableMap job. It will
085   * appropriately set up the JobConf.
086   *
087   * @param table  The table name to read from.
088   * @param columns  The columns to scan.
089   * @param mapper  The mapper class to use.
090   * @param outputKeyClass  The class of the output key.
091   * @param outputValueClass  The class of the output value.
092   * @param job  The current job configuration to adjust.
093   * @param addDependencyJars upload HBase jars and jars for any of the configured
094   *           job classes via the distributed cache (tmpjars).
095   */
096  public static void initTableMapJob(String table, String columns,
097    Class<? extends TableMap> mapper,
098    Class<?> outputKeyClass,
099    Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
100    Class<? extends InputFormat> inputFormat) {
101
102    job.setInputFormat(inputFormat);
103    job.setMapOutputValueClass(outputValueClass);
104    job.setMapOutputKeyClass(outputKeyClass);
105    job.setMapperClass(mapper);
106    job.setStrings("io.serializations", job.get("io.serializations"),
107        MutationSerialization.class.getName(), ResultSerialization.class.getName());
108    FileInputFormat.addInputPaths(job, table);
109    job.set(TableInputFormat.COLUMN_LIST, columns);
110    if (addDependencyJars) {
111      try {
112        addDependencyJars(job);
113      } catch (IOException e) {
114        e.printStackTrace();
115      }
116    }
117    try {
118      initCredentials(job);
119    } catch (IOException ioe) {
120      // just spit out the stack trace?  really?
121      ioe.printStackTrace();
122    }
123  }
124
125  /**
126   * Sets up the job for reading from one or more multiple table snapshots, with one or more scans
127   * per snapshot.
128   * It bypasses hbase servers and read directly from snapshot files.
129   *
130   * @param snapshotScans     map of snapshot name to scans on that snapshot.
131   * @param mapper            The mapper class to use.
132   * @param outputKeyClass    The class of the output key.
133   * @param outputValueClass  The class of the output value.
134   * @param job               The current job to adjust.  Make sure the passed job is
135   *                          carrying all necessary HBase configuration.
136   * @param addDependencyJars upload HBase jars and jars for any of the configured
137   *                          job classes via the distributed cache (tmpjars).
138   */
139  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
140      Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
141      JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
142    MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
143
144    job.setInputFormat(MultiTableSnapshotInputFormat.class);
145    if (outputValueClass != null) {
146      job.setMapOutputValueClass(outputValueClass);
147    }
148    if (outputKeyClass != null) {
149      job.setMapOutputKeyClass(outputKeyClass);
150    }
151    job.setMapperClass(mapper);
152    if (addDependencyJars) {
153      addDependencyJars(job);
154    }
155
156    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
157  }
158
159  /**
160   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
161   * and read directly from snapshot files.
162   *
163   * @param snapshotName The name of the snapshot (of a table) to read from.
164   * @param columns  The columns to scan.
165   * @param mapper  The mapper class to use.
166   * @param outputKeyClass  The class of the output key.
167   * @param outputValueClass  The class of the output value.
168   * @param job  The current job to adjust.  Make sure the passed job is
169   * carrying all necessary HBase configuration.
170   * @param addDependencyJars upload HBase jars and jars for any of the configured
171   *           job classes via the distributed cache (tmpjars).
172   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
173   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
174   * After the job is finished, restore directory can be deleted.
175   * @throws IOException When setting up the details fails.
176   * @see TableSnapshotInputFormat
177   */
178  public static void initTableSnapshotMapJob(String snapshotName, String columns,
179      Class<? extends TableMap> mapper,
180      Class<?> outputKeyClass,
181      Class<?> outputValueClass, JobConf job,
182      boolean addDependencyJars, Path tmpRestoreDir)
183  throws IOException {
184    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
185    initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
186      addDependencyJars, TableSnapshotInputFormat.class);
187    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
188  }
189
190  /**
191   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
192   * and read directly from snapshot files.
193   *
194   * @param snapshotName The name of the snapshot (of a table) to read from.
195   * @param columns  The columns to scan.
196   * @param mapper  The mapper class to use.
197   * @param outputKeyClass  The class of the output key.
198   * @param outputValueClass  The class of the output value.
199   * @param jobConf  The current job to adjust.  Make sure the passed job is
200   * carrying all necessary HBase configuration.
201   * @param addDependencyJars upload HBase jars and jars for any of the configured
202   *           job classes via the distributed cache (tmpjars).
203   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
204   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
205   * After the job is finished, restore directory can be deleted.
206   * @param splitAlgo algorithm to split
207   * @param numSplitsPerRegion how many input splits to generate per one region
208   * @throws IOException When setting up the details fails.
209   * @see TableSnapshotInputFormat
210   */
211  public static void initTableSnapshotMapJob(String snapshotName, String columns,
212                                             Class<? extends TableMap> mapper,
213                                             Class<?> outputKeyClass,
214                                             Class<?> outputValueClass, JobConf jobConf,
215                                             boolean addDependencyJars, Path tmpRestoreDir,
216                                             RegionSplitter.SplitAlgorithm splitAlgo,
217                                             int numSplitsPerRegion)
218          throws IOException {
219    TableSnapshotInputFormat.setInput(jobConf, snapshotName, tmpRestoreDir, splitAlgo,
220            numSplitsPerRegion);
221    initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, jobConf,
222            addDependencyJars, TableSnapshotInputFormat.class);
223    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(jobConf);
224  }
225
226
227  /**
228   * Use this before submitting a TableReduce job. It will
229   * appropriately set up the JobConf.
230   *
231   * @param table  The output table.
232   * @param reducer  The reducer class to use.
233   * @param job  The current job configuration to adjust.
234   * @throws IOException When determining the region count fails.
235   */
236  public static void initTableReduceJob(String table,
237    Class<? extends TableReduce> reducer, JobConf job)
238  throws IOException {
239    initTableReduceJob(table, reducer, job, null);
240  }
241
242  /**
243   * Use this before submitting a TableReduce job. It will
244   * appropriately set up the JobConf.
245   *
246   * @param table  The output table.
247   * @param reducer  The reducer class to use.
248   * @param job  The current job configuration to adjust.
249   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
250   * default partitioner.
251   * @throws IOException When determining the region count fails.
252   */
253  public static void initTableReduceJob(String table,
254    Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
255  throws IOException {
256    initTableReduceJob(table, reducer, job, partitioner, true);
257  }
258
259  /**
260   * Use this before submitting a TableReduce job. It will
261   * appropriately set up the JobConf.
262   *
263   * @param table  The output table.
264   * @param reducer  The reducer class to use.
265   * @param job  The current job configuration to adjust.
266   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
267   * default partitioner.
268   * @param addDependencyJars upload HBase jars and jars for any of the configured
269   *           job classes via the distributed cache (tmpjars).
270   * @throws IOException When determining the region count fails.
271   */
272  public static void initTableReduceJob(String table,
273    Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
274    boolean addDependencyJars) throws IOException {
275    job.setOutputFormat(TableOutputFormat.class);
276    job.setReducerClass(reducer);
277    job.set(TableOutputFormat.OUTPUT_TABLE, table);
278    job.setOutputKeyClass(ImmutableBytesWritable.class);
279    job.setOutputValueClass(Put.class);
280    job.setStrings("io.serializations", job.get("io.serializations"),
281        MutationSerialization.class.getName(), ResultSerialization.class.getName());
282    if (partitioner == HRegionPartitioner.class) {
283      job.setPartitionerClass(HRegionPartitioner.class);
284      int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
285      if (job.getNumReduceTasks() > regions) {
286        job.setNumReduceTasks(regions);
287      }
288    } else if (partitioner != null) {
289      job.setPartitionerClass(partitioner);
290    }
291    if (addDependencyJars) {
292      addDependencyJars(job);
293    }
294    initCredentials(job);
295  }
296
297  public static void initCredentials(JobConf job) throws IOException {
298    UserProvider userProvider = UserProvider.instantiate(job);
299    if (userProvider.isHadoopSecurityEnabled()) {
300      // propagate delegation related props from launcher job to MR job
301      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
302        job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
303      }
304    }
305
306    if (userProvider.isHBaseSecurityEnabled()) {
307      Connection conn = ConnectionFactory.createConnection(job);
308      try {
309        // login the server principal (if using secure Hadoop)
310        User user = userProvider.getCurrent();
311        TokenUtil.addTokenForJob(conn, job, user);
312      } catch (InterruptedException ie) {
313        ie.printStackTrace();
314        Thread.currentThread().interrupt();
315      } finally {
316        conn.close();
317      }
318    }
319  }
320
321  /**
322   * Ensures that the given number of reduce tasks for the given job
323   * configuration does not exceed the number of regions for the given table.
324   *
325   * @param table  The table to get the region count for.
326   * @param job  The current job configuration to adjust.
327   * @throws IOException When retrieving the table details fails.
328   */
329  // Used by tests.
330  public static void limitNumReduceTasks(String table, JobConf job) throws IOException {
331    int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
332    if (job.getNumReduceTasks() > regions) {
333      job.setNumReduceTasks(regions);
334    }
335  }
336
337  /**
338   * Ensures that the given number of map tasks for the given job
339   * configuration does not exceed the number of regions for the given table.
340   *
341   * @param table  The table to get the region count for.
342   * @param job  The current job configuration to adjust.
343   * @throws IOException When retrieving the table details fails.
344   */
345  // Used by tests.
346  public static void limitNumMapTasks(String table, JobConf job) throws IOException {
347    int regions = getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
348    if (job.getNumMapTasks() > regions) {
349      job.setNumMapTasks(regions);
350    }
351  }
352
353  /**
354   * Sets the number of reduce tasks for the given job configuration to the
355   * number of regions the given table has.
356   *
357   * @param table  The table to get the region count for.
358   * @param job  The current job configuration to adjust.
359   * @throws IOException When retrieving the table details fails.
360   */
361  public static void setNumReduceTasks(String table, JobConf job) throws IOException {
362    job.setNumReduceTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)));
363  }
364
365  /**
366   * Sets the number of map tasks for the given job configuration to the
367   * number of regions the given table has.
368   *
369   * @param table  The table to get the region count for.
370   * @param job  The current job configuration to adjust.
371   * @throws IOException When retrieving the table details fails.
372   */
373  public static void setNumMapTasks(String table, JobConf job) throws IOException {
374    job.setNumMapTasks(getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)));
375  }
376
377  /**
378   * Sets the number of rows to return and cache with each scanner iteration.
379   * Higher caching values will enable faster mapreduce jobs at the expense of
380   * requiring more heap to contain the cached rows.
381   *
382   * @param job The current job configuration to adjust.
383   * @param batchSize The number of rows to return in batch with each scanner
384   * iteration.
385   */
386  public static void setScannerCaching(JobConf job, int batchSize) {
387    job.setInt("hbase.client.scanner.caching", batchSize);
388  }
389
390  /**
391   * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
392   */
393  public static void addDependencyJars(JobConf job) throws IOException {
394    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
395    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(
396      job,
397      job.getMapOutputKeyClass(),
398      job.getMapOutputValueClass(),
399      job.getOutputKeyClass(),
400      job.getOutputValueClass(),
401      job.getPartitionerClass(),
402      job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
403      job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
404      job.getCombinerClass());
405  }
406
407
408  private static int getRegionCount(Configuration conf, TableName tableName) throws IOException {
409    try (Connection conn = ConnectionFactory.createConnection(conf);
410      RegionLocator locator = conn.getRegionLocator(tableName)) {
411      return locator.getAllRegionLocations().size();
412    }
413  }
414}