001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapred;
020
021import org.apache.hadoop.fs.Path;
022import org.apache.hadoop.hbase.HBaseConfiguration;
023import org.apache.hadoop.hbase.MetaTableAccessor;
024import org.apache.hadoop.hbase.TableName;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.client.ConnectionFactory;
028import org.apache.hadoop.hbase.client.Put;
029import org.apache.hadoop.hbase.client.Scan;
030import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
031import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
032import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
033import org.apache.hadoop.hbase.security.User;
034import org.apache.hadoop.hbase.security.UserProvider;
035import org.apache.hadoop.hbase.security.token.TokenUtil;
036import org.apache.hadoop.hbase.util.RegionSplitter;
037import org.apache.hadoop.mapred.FileInputFormat;
038import org.apache.hadoop.mapred.InputFormat;
039import org.apache.hadoop.mapred.JobConf;
040import org.apache.hadoop.mapred.OutputFormat;
041import org.apache.hadoop.mapred.TextInputFormat;
042import org.apache.hadoop.mapred.TextOutputFormat;
043
044import java.io.IOException;
045import java.util.Collection;
046import java.util.Map;
047
048/**
049 * Utility for {@link TableMap} and {@link TableReduce}
050 */
051@InterfaceAudience.Public
052@SuppressWarnings({ "rawtypes", "unchecked" })
053public class TableMapReduceUtil {
054
055  /**
056   * Use this before submitting a TableMap job. It will
057   * appropriately set up the JobConf.
058   *
059   * @param table  The table name to read from.
060   * @param columns  The columns to scan.
061   * @param mapper  The mapper class to use.
062   * @param outputKeyClass  The class of the output key.
063   * @param outputValueClass  The class of the output value.
064   * @param job  The current job configuration to adjust.
065   */
066  public static void initTableMapJob(String table, String columns,
067    Class<? extends TableMap> mapper,
068    Class<?> outputKeyClass,
069    Class<?> outputValueClass, JobConf job) {
070    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
071      true, TableInputFormat.class);
072  }
073
074  public static void initTableMapJob(String table, String columns,
075    Class<? extends TableMap> mapper,
076    Class<?> outputKeyClass,
077    Class<?> outputValueClass, JobConf job, boolean addDependencyJars) {
078    initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job,
079      addDependencyJars, TableInputFormat.class);
080  }
081
082  /**
083   * Use this before submitting a TableMap job. It will
084   * appropriately set up the JobConf.
085   *
086   * @param table  The table name to read from.
087   * @param columns  The columns to scan.
088   * @param mapper  The mapper class to use.
089   * @param outputKeyClass  The class of the output key.
090   * @param outputValueClass  The class of the output value.
091   * @param job  The current job configuration to adjust.
092   * @param addDependencyJars upload HBase jars and jars for any of the configured
093   *           job classes via the distributed cache (tmpjars).
094   */
095  public static void initTableMapJob(String table, String columns,
096    Class<? extends TableMap> mapper,
097    Class<?> outputKeyClass,
098    Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
099    Class<? extends InputFormat> inputFormat) {
100
101    job.setInputFormat(inputFormat);
102    job.setMapOutputValueClass(outputValueClass);
103    job.setMapOutputKeyClass(outputKeyClass);
104    job.setMapperClass(mapper);
105    job.setStrings("io.serializations", job.get("io.serializations"),
106        MutationSerialization.class.getName(), ResultSerialization.class.getName());
107    FileInputFormat.addInputPaths(job, table);
108    job.set(TableInputFormat.COLUMN_LIST, columns);
109    if (addDependencyJars) {
110      try {
111        addDependencyJars(job);
112      } catch (IOException e) {
113        e.printStackTrace();
114      }
115    }
116    try {
117      initCredentials(job);
118    } catch (IOException ioe) {
119      // just spit out the stack trace?  really?
120      ioe.printStackTrace();
121    }
122  }
123
124  /**
125   * Sets up the job for reading from one or more multiple table snapshots, with one or more scans
126   * per snapshot.
127   * It bypasses hbase servers and read directly from snapshot files.
128   *
129   * @param snapshotScans     map of snapshot name to scans on that snapshot.
130   * @param mapper            The mapper class to use.
131   * @param outputKeyClass    The class of the output key.
132   * @param outputValueClass  The class of the output value.
133   * @param job               The current job to adjust.  Make sure the passed job is
134   *                          carrying all necessary HBase configuration.
135   * @param addDependencyJars upload HBase jars and jars for any of the configured
136   *                          job classes via the distributed cache (tmpjars).
137   */
138  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
139      Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
140      JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
141    MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir);
142
143    job.setInputFormat(MultiTableSnapshotInputFormat.class);
144    if (outputValueClass != null) {
145      job.setMapOutputValueClass(outputValueClass);
146    }
147    if (outputKeyClass != null) {
148      job.setMapOutputKeyClass(outputKeyClass);
149    }
150    job.setMapperClass(mapper);
151    if (addDependencyJars) {
152      addDependencyJars(job);
153    }
154
155    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
156  }
157
158  /**
159   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
160   * and read directly from snapshot files.
161   *
162   * @param snapshotName The name of the snapshot (of a table) to read from.
163   * @param columns  The columns to scan.
164   * @param mapper  The mapper class to use.
165   * @param outputKeyClass  The class of the output key.
166   * @param outputValueClass  The class of the output value.
167   * @param job  The current job to adjust.  Make sure the passed job is
168   * carrying all necessary HBase configuration.
169   * @param addDependencyJars upload HBase jars and jars for any of the configured
170   *           job classes via the distributed cache (tmpjars).
171   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
172   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
173   * After the job is finished, restore directory can be deleted.
174   * @throws IOException When setting up the details fails.
175   * @see TableSnapshotInputFormat
176   */
177  public static void initTableSnapshotMapJob(String snapshotName, String columns,
178      Class<? extends TableMap> mapper,
179      Class<?> outputKeyClass,
180      Class<?> outputValueClass, JobConf job,
181      boolean addDependencyJars, Path tmpRestoreDir)
182  throws IOException {
183    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
184    initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job,
185      addDependencyJars, TableSnapshotInputFormat.class);
186    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job);
187  }
188
189  /**
190   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
191   * and read directly from snapshot files.
192   *
193   * @param snapshotName The name of the snapshot (of a table) to read from.
194   * @param columns  The columns to scan.
195   * @param mapper  The mapper class to use.
196   * @param outputKeyClass  The class of the output key.
197   * @param outputValueClass  The class of the output value.
198   * @param jobConf  The current job to adjust.  Make sure the passed job is
199   * carrying all necessary HBase configuration.
200   * @param addDependencyJars upload HBase jars and jars for any of the configured
201   *           job classes via the distributed cache (tmpjars).
202   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
203   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
204   * After the job is finished, restore directory can be deleted.
205   * @param splitAlgo algorithm to split
206   * @param numSplitsPerRegion how many input splits to generate per one region
207   * @throws IOException When setting up the details fails.
208   * @see TableSnapshotInputFormat
209   */
210  public static void initTableSnapshotMapJob(String snapshotName, String columns,
211                                             Class<? extends TableMap> mapper,
212                                             Class<?> outputKeyClass,
213                                             Class<?> outputValueClass, JobConf jobConf,
214                                             boolean addDependencyJars, Path tmpRestoreDir,
215                                             RegionSplitter.SplitAlgorithm splitAlgo,
216                                             int numSplitsPerRegion)
217          throws IOException {
218    TableSnapshotInputFormat.setInput(jobConf, snapshotName, tmpRestoreDir, splitAlgo,
219            numSplitsPerRegion);
220    initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, jobConf,
221            addDependencyJars, TableSnapshotInputFormat.class);
222    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(jobConf);
223  }
224
225
226  /**
227   * Use this before submitting a TableReduce job. It will
228   * appropriately set up the JobConf.
229   *
230   * @param table  The output table.
231   * @param reducer  The reducer class to use.
232   * @param job  The current job configuration to adjust.
233   * @throws IOException When determining the region count fails.
234   */
235  public static void initTableReduceJob(String table,
236    Class<? extends TableReduce> reducer, JobConf job)
237  throws IOException {
238    initTableReduceJob(table, reducer, job, null);
239  }
240
241  /**
242   * Use this before submitting a TableReduce job. It will
243   * appropriately set up the JobConf.
244   *
245   * @param table  The output table.
246   * @param reducer  The reducer class to use.
247   * @param job  The current job configuration to adjust.
248   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
249   * default partitioner.
250   * @throws IOException When determining the region count fails.
251   */
252  public static void initTableReduceJob(String table,
253    Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
254  throws IOException {
255    initTableReduceJob(table, reducer, job, partitioner, true);
256  }
257
258  /**
259   * Use this before submitting a TableReduce job. It will
260   * appropriately set up the JobConf.
261   *
262   * @param table  The output table.
263   * @param reducer  The reducer class to use.
264   * @param job  The current job configuration to adjust.
265   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
266   * default partitioner.
267   * @param addDependencyJars upload HBase jars and jars for any of the configured
268   *           job classes via the distributed cache (tmpjars).
269   * @throws IOException When determining the region count fails.
270   */
271  public static void initTableReduceJob(String table,
272    Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
273    boolean addDependencyJars) throws IOException {
274    job.setOutputFormat(TableOutputFormat.class);
275    job.setReducerClass(reducer);
276    job.set(TableOutputFormat.OUTPUT_TABLE, table);
277    job.setOutputKeyClass(ImmutableBytesWritable.class);
278    job.setOutputValueClass(Put.class);
279    job.setStrings("io.serializations", job.get("io.serializations"),
280        MutationSerialization.class.getName(), ResultSerialization.class.getName());
281    if (partitioner == HRegionPartitioner.class) {
282      job.setPartitionerClass(HRegionPartitioner.class);
283      int regions =
284        MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
285      if (job.getNumReduceTasks() > regions) {
286        job.setNumReduceTasks(regions);
287      }
288    } else if (partitioner != null) {
289      job.setPartitionerClass(partitioner);
290    }
291    if (addDependencyJars) {
292      addDependencyJars(job);
293    }
294    initCredentials(job);
295  }
296
297  public static void initCredentials(JobConf job) throws IOException {
298    UserProvider userProvider = UserProvider.instantiate(job);
299    if (userProvider.isHadoopSecurityEnabled()) {
300      // propagate delegation related props from launcher job to MR job
301      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
302        job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
303      }
304    }
305
306    if (userProvider.isHBaseSecurityEnabled()) {
307      Connection conn = ConnectionFactory.createConnection(job);
308      try {
309        // login the server principal (if using secure Hadoop)
310        User user = userProvider.getCurrent();
311        TokenUtil.addTokenForJob(conn, job, user);
312      } catch (InterruptedException ie) {
313        ie.printStackTrace();
314        Thread.currentThread().interrupt();
315      } finally {
316        conn.close();
317      }
318    }
319  }
320
321  /**
322   * Ensures that the given number of reduce tasks for the given job
323   * configuration does not exceed the number of regions for the given table.
324   *
325   * @param table  The table to get the region count for.
326   * @param job  The current job configuration to adjust.
327   * @throws IOException When retrieving the table details fails.
328   */
329  // Used by tests.
330  public static void limitNumReduceTasks(String table, JobConf job)
331  throws IOException {
332    int regions =
333      MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
334    if (job.getNumReduceTasks() > regions)
335      job.setNumReduceTasks(regions);
336  }
337
338  /**
339   * Ensures that the given number of map tasks for the given job
340   * configuration does not exceed the number of regions for the given table.
341   *
342   * @param table  The table to get the region count for.
343   * @param job  The current job configuration to adjust.
344   * @throws IOException When retrieving the table details fails.
345   */
346  // Used by tests.
347  public static void limitNumMapTasks(String table, JobConf job)
348  throws IOException {
349    int regions =
350      MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
351    if (job.getNumMapTasks() > regions)
352      job.setNumMapTasks(regions);
353  }
354
355  /**
356   * Sets the number of reduce tasks for the given job configuration to the
357   * number of regions the given table has.
358   *
359   * @param table  The table to get the region count for.
360   * @param job  The current job configuration to adjust.
361   * @throws IOException When retrieving the table details fails.
362   */
363  public static void setNumReduceTasks(String table, JobConf job)
364  throws IOException {
365    job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
366      TableName.valueOf(table)));
367  }
368
369  /**
370   * Sets the number of map tasks for the given job configuration to the
371   * number of regions the given table has.
372   *
373   * @param table  The table to get the region count for.
374   * @param job  The current job configuration to adjust.
375   * @throws IOException When retrieving the table details fails.
376   */
377  public static void setNumMapTasks(String table, JobConf job)
378  throws IOException {
379    job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job),
380      TableName.valueOf(table)));
381  }
382
383  /**
384   * Sets the number of rows to return and cache with each scanner iteration.
385   * Higher caching values will enable faster mapreduce jobs at the expense of
386   * requiring more heap to contain the cached rows.
387   *
388   * @param job The current job configuration to adjust.
389   * @param batchSize The number of rows to return in batch with each scanner
390   * iteration.
391   */
392  public static void setScannerCaching(JobConf job, int batchSize) {
393    job.setInt("hbase.client.scanner.caching", batchSize);
394  }
395
396  /**
397   * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)
398   */
399  public static void addDependencyJars(JobConf job) throws IOException {
400    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job);
401    org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(
402      job,
403      job.getMapOutputKeyClass(),
404      job.getMapOutputValueClass(),
405      job.getOutputKeyClass(),
406      job.getOutputValueClass(),
407      job.getPartitionerClass(),
408      job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
409      job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
410      job.getCombinerClass());
411  }
412}