001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.conf.Configured;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellUtil;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.RegionLocator;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
033import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
034import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
035import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
036import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
039import org.apache.hadoop.io.NullWritable;
040import org.apache.hadoop.mapreduce.Job;
041import org.apache.hadoop.mapreduce.Mapper;
042import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
043import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
044import org.apache.hadoop.util.Tool;
045import org.apache.hadoop.util.ToolRunner;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
052 * for later bulk importing.
053 */
054@InterfaceAudience.Private
055public class MapReduceHFileSplitterJob extends Configured implements Tool {
056  private static final Logger LOG = LoggerFactory.getLogger(MapReduceHFileSplitterJob.class);
057  final static String NAME = "HFileSplitterJob";
058  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
059  public final static String TABLES_KEY = "hfile.input.tables";
060  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
061  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
062
063  public MapReduceHFileSplitterJob() {
064  }
065
066  protected MapReduceHFileSplitterJob(final Configuration c) {
067    super(c);
068  }
069
070  /**
071   * A mapper that just writes out cells. This one can be used together with {@link CellSortReducer}
072   */
073  static class HFileCellMapper extends Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
074
075    @Override
076    public void map(NullWritable key, Cell value, Context context)
077      throws IOException, InterruptedException {
078      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)),
079        new MapReduceExtendedCell(value));
080    }
081
082    @Override
083    public void setup(Context context) throws IOException {
084      // do nothing
085    }
086  }
087
088  /**
089   * Sets up the actual job.
090   * @param args The command line parameters.
091   * @return The newly created job.
092   * @throws IOException When setting up the job fails.
093   */
094  public Job createSubmittableJob(String[] args) throws IOException {
095    Configuration conf = getConf();
096    String inputDirs = args[0];
097    String tabName = args[1];
098    conf.setStrings(TABLES_KEY, tabName);
099    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
100    Job job = Job.getInstance(conf,
101      conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
102    job.setJarByClass(MapReduceHFileSplitterJob.class);
103    job.setInputFormatClass(HFileInputFormat.class);
104    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
105    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
106    if (hfileOutPath != null) {
107      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
108      TableName tableName = TableName.valueOf(tabName);
109      job.setMapperClass(HFileCellMapper.class);
110      job.setReducerClass(CellSortReducer.class);
111      Path outputDir = new Path(hfileOutPath);
112      FileOutputFormat.setOutputPath(job, outputDir);
113      job.setMapOutputValueClass(MapReduceExtendedCell.class);
114      try (Connection conn = ConnectionFactory.createConnection(conf);
115        Table table = conn.getTable(tableName);
116        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
117        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
118      }
119      LOG.debug("success configuring load incremental job");
120
121      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
122        org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
123    } else {
124      throw new IOException("No bulk output directory specified");
125    }
126    return job;
127  }
128
129  /**
130   * Print usage
131   * @param errorMsg Error message. Can be null.
132   */
133  private void usage(final String errorMsg) {
134    if (errorMsg != null && errorMsg.length() > 0) {
135      System.err.println("ERROR: " + errorMsg);
136    }
137    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
138    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
139    System.err.println("<table>  table to load.\n");
140    System.err.println("To generate HFiles for a bulk data load, pass the option:");
141    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
142    System.err.println("Other options:");
143    System.err.println("   -D " + JOB_NAME_CONF_KEY
144      + "=jobName - use the specified mapreduce job name for the HFile splitter");
145    System.err.println("For performance also consider the following options:\n"
146      + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
147  }
148
149  /**
150   * Main entry point.
151   * @param args The command line parameters.
152   * @throws Exception When running the job fails.
153   */
154  public static void main(String[] args) throws Exception {
155    int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args);
156    System.exit(ret);
157  }
158
159  @Override
160  public int run(String[] args) throws Exception {
161    if (args.length < 2) {
162      usage("Wrong number of arguments: " + args.length);
163      return -1;
164    }
165    Job job = createSubmittableJob(args);
166    int result = job.waitForCompletion(true) ? 0 : 1;
167    return result;
168  }
169}