001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.conf.Configured;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellUtil;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.PrivateCellUtil;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.ConnectionFactory;
031import org.apache.hadoop.hbase.client.RegionLocator;
032import org.apache.hadoop.hbase.client.Table;
033import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
034import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
035import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
036import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
037import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
038import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
041import org.apache.hadoop.io.NullWritable;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.Mapper;
044import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
045import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
046import org.apache.hadoop.util.Tool;
047import org.apache.hadoop.util.ToolRunner;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
054 * for later bulk importing.
055 */
056@InterfaceAudience.Private
057public class MapReduceHFileSplitterJob extends Configured implements Tool {
058  private static final Logger LOG = LoggerFactory.getLogger(MapReduceHFileSplitterJob.class);
059  final static String NAME = "HFileSplitterJob";
060  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
061  public final static String TABLES_KEY = "hfile.input.tables";
062  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
063  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
064
065  public MapReduceHFileSplitterJob() {
066  }
067
068  protected MapReduceHFileSplitterJob(final Configuration c) {
069    super(c);
070  }
071
072  /**
073   * A mapper that just writes out cells. This one can be used together with {@link CellSortReducer}
074   */
075  static class HFileCellMapper extends Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
076
077    @Override
078    public void map(NullWritable key, Cell value, Context context)
079      throws IOException, InterruptedException {
080      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)),
081        new MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(value)));
082    }
083
084    @Override
085    public void setup(Context context) throws IOException {
086      // do nothing
087    }
088  }
089
090  /**
091   * Sets up the actual job.
092   * @param args The command line parameters.
093   * @return The newly created job.
094   * @throws IOException When setting up the job fails.
095   */
096  public Job createSubmittableJob(String[] args) throws IOException {
097    Configuration conf = getConf();
098    String inputDirs = args[0];
099    String tabName = args[1];
100    conf.setStrings(TABLES_KEY, tabName);
101    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
102    Job job = Job.getInstance(conf,
103      conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
104    // MapReduceHFileSplitter needs ExtendedCellSerialization so that sequenceId can be propagated
105    // when sorting cells in CellSortReducer
106    job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
107      true);
108    job.setJarByClass(MapReduceHFileSplitterJob.class);
109    job.setInputFormatClass(HFileInputFormat.class);
110    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
111    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
112    if (hfileOutPath != null) {
113      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
114      TableName tableName = TableName.valueOf(tabName);
115      job.setMapperClass(HFileCellMapper.class);
116      job.setReducerClass(CellSortReducer.class);
117      Path outputDir = new Path(hfileOutPath);
118      FileOutputFormat.setOutputPath(job, outputDir);
119      job.setMapOutputValueClass(MapReduceExtendedCell.class);
120      try (Connection conn = ConnectionFactory.createConnection(conf);
121        Table table = conn.getTable(tableName);
122        RegionLocator regionLocator = getRegionLocator(conf, conn, tableName)) {
123        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
124      }
125      LOG.debug("success configuring load incremental job");
126
127      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
128        org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
129    } else {
130      throw new IOException("No bulk output directory specified");
131    }
132    return job;
133  }
134
135  /**
136   * Print usage
137   * @param errorMsg Error message. Can be null.
138   */
139  private void usage(final String errorMsg) {
140    if (errorMsg != null && errorMsg.length() > 0) {
141      System.err.println("ERROR: " + errorMsg);
142    }
143    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
144    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
145    System.err.println("<table>  table to load.\n");
146    System.err.println("To generate HFiles for a bulk data load, pass the option:");
147    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
148    System.err.println("Other options:");
149    System.err.println("   -D " + JOB_NAME_CONF_KEY
150      + "=jobName - use the specified mapreduce job name for the HFile splitter");
151    System.err.println("For performance also consider the following options:\n"
152      + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
153  }
154
155  /**
156   * Main entry point.
157   * @param args The command line parameters.
158   * @throws Exception When running the job fails.
159   */
160  public static void main(String[] args) throws Exception {
161    int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args);
162    System.exit(ret);
163  }
164
165  @Override
166  public int run(String[] args) throws Exception {
167    if (args.length < 2) {
168      usage("Wrong number of arguments: " + args.length);
169      return -1;
170    }
171    Job job = createSubmittableJob(args);
172    int result = job.waitForCompletion(true) ? 0 : 1;
173    return result;
174  }
175
176  private static RegionLocator getRegionLocator(Configuration conf, Connection conn,
177    TableName table) throws IOException {
178    if (SnapshotRegionLocator.shouldUseSnapshotRegionLocator(conf, table)) {
179      return SnapshotRegionLocator.create(conf, table);
180    }
181
182    return conn.getRegionLocator(table);
183  }
184}