001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.conf.Configured;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellUtil;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.RegionLocator;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
033import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
034import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
035import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
036import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
039import org.apache.hadoop.io.NullWritable;
040import org.apache.hadoop.mapreduce.Job;
041import org.apache.hadoop.mapreduce.Mapper;
042import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
043import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
044import org.apache.hadoop.util.Tool;
045import org.apache.hadoop.util.ToolRunner;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
052 * for later bulk importing.
053 */
054@InterfaceAudience.Private
055public class MapReduceHFileSplitterJob extends Configured implements Tool {
056  private static final Logger LOG = LoggerFactory.getLogger(MapReduceHFileSplitterJob.class);
057  final static String NAME = "HFileSplitterJob";
058  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
059  public final static String TABLES_KEY = "hfile.input.tables";
060  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
061  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
062
063  public MapReduceHFileSplitterJob() {
064  }
065
066  protected MapReduceHFileSplitterJob(final Configuration c) {
067    super(c);
068  }
069
070  /**
071   * A mapper that just writes out cells. This one can be used together with {@link CellSortReducer}
072   */
073  static class HFileCellMapper extends Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
074
075    @Override
076    public void map(NullWritable key, Cell value, Context context)
077      throws IOException, InterruptedException {
078      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)),
079        new MapReduceExtendedCell(value));
080    }
081
082    @Override
083    public void setup(Context context) throws IOException {
084      // do nothing
085    }
086  }
087
088  /**
089   * Sets up the actual job.
090   * @param args The command line parameters.
091   * @return The newly created job.
092   * @throws IOException When setting up the job fails.
093   */
094  public Job createSubmittableJob(String[] args) throws IOException {
095    Configuration conf = getConf();
096    String inputDirs = args[0];
097    String tabName = args[1];
098    conf.setStrings(TABLES_KEY, tabName);
099    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
100    Job job = Job.getInstance(conf,
101      conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
102    // MapReduceHFileSplitter needs ExtendedCellSerialization so that sequenceId can be propagated
103    // when sorting cells in CellSortReducer
104    job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
105      true);
106    job.setJarByClass(MapReduceHFileSplitterJob.class);
107    job.setInputFormatClass(HFileInputFormat.class);
108    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
109    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
110    if (hfileOutPath != null) {
111      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
112      TableName tableName = TableName.valueOf(tabName);
113      job.setMapperClass(HFileCellMapper.class);
114      job.setReducerClass(CellSortReducer.class);
115      Path outputDir = new Path(hfileOutPath);
116      FileOutputFormat.setOutputPath(job, outputDir);
117      job.setMapOutputValueClass(MapReduceExtendedCell.class);
118      try (Connection conn = ConnectionFactory.createConnection(conf);
119        Table table = conn.getTable(tableName);
120        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
121        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
122      }
123      LOG.debug("success configuring load incremental job");
124
125      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
126        org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
127    } else {
128      throw new IOException("No bulk output directory specified");
129    }
130    return job;
131  }
132
133  /**
134   * Print usage
135   * @param errorMsg Error message. Can be null.
136   */
137  private void usage(final String errorMsg) {
138    if (errorMsg != null && errorMsg.length() > 0) {
139      System.err.println("ERROR: " + errorMsg);
140    }
141    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
142    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
143    System.err.println("<table>  table to load.\n");
144    System.err.println("To generate HFiles for a bulk data load, pass the option:");
145    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
146    System.err.println("Other options:");
147    System.err.println("   -D " + JOB_NAME_CONF_KEY
148      + "=jobName - use the specified mapreduce job name for the HFile splitter");
149    System.err.println("For performance also consider the following options:\n"
150      + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
151  }
152
153  /**
154   * Main entry point.
155   * @param args The command line parameters.
156   * @throws Exception When running the job fails.
157   */
158  public static void main(String[] args) throws Exception {
159    int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args);
160    System.exit(ret);
161  }
162
163  @Override
164  public int run(String[] args) throws Exception {
165    if (args.length < 2) {
166      usage("Wrong number of arguments: " + args.length);
167      return -1;
168    }
169    Job job = createSubmittableJob(args);
170    int result = job.waitForCompletion(true) ? 0 : 1;
171    return result;
172  }
173}