001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.mapreduce;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.conf.Configured;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.Cell;
025import org.apache.hadoop.hbase.CellUtil;
026import org.apache.hadoop.hbase.ExtendedCell;
027import org.apache.hadoop.hbase.HBaseConfiguration;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.RegionLocator;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
035import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
036import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
037import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
038import org.apache.hadoop.hbase.mapreduce.KeyOnlyCellComparable;
039import org.apache.hadoop.hbase.mapreduce.PreSortedCellsReducer;
040import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
041import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
044import org.apache.hadoop.io.NullWritable;
045import org.apache.hadoop.io.WritableComparable;
046import org.apache.hadoop.mapreduce.Job;
047import org.apache.hadoop.mapreduce.Mapper;
048import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
049import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
050import org.apache.hadoop.util.Tool;
051import org.apache.hadoop.util.ToolRunner;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
058 * for later bulk importing.
059 */
060@InterfaceAudience.Private
061public class MapReduceHFileSplitterJob extends Configured implements Tool {
062  private static final Logger LOG = LoggerFactory.getLogger(MapReduceHFileSplitterJob.class);
063  final static String NAME = "HFileSplitterJob";
064  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
065  public final static String TABLES_KEY = "hfile.input.tables";
066  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
067  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
068
069  public MapReduceHFileSplitterJob() {
070  }
071
072  protected MapReduceHFileSplitterJob(final Configuration c) {
073    super(c);
074  }
075
076  /**
077   * A mapper that just writes out cells. This one can be used together with {@link CellSortReducer}
078   */
079  static class HFileCellMapper extends Mapper<NullWritable, Cell, WritableComparable<?>, Cell> {
080
081    private boolean diskBasedSortingEnabled = false;
082
083    @Override
084    public void map(NullWritable key, Cell value, Context context)
085      throws IOException, InterruptedException {
086      ExtendedCell extendedCell = PrivateCellUtil.ensureExtendedCell(value);
087      context.write(wrap(extendedCell), new MapReduceExtendedCell(extendedCell));
088    }
089
090    @Override
091    public void setup(Context context) throws IOException {
092      diskBasedSortingEnabled =
093        HFileOutputFormat2.diskBasedSortingEnabled(context.getConfiguration());
094    }
095
096    private WritableComparable<?> wrap(ExtendedCell cell) {
097      if (diskBasedSortingEnabled) {
098        return new KeyOnlyCellComparable(cell);
099      }
100      return new ImmutableBytesWritable(CellUtil.cloneRow(cell));
101    }
102  }
103
104  /**
105   * Sets up the actual job.
106   * @param args The command line parameters.
107   * @return The newly created job.
108   * @throws IOException When setting up the job fails.
109   */
110  public Job createSubmittableJob(String[] args) throws IOException {
111    Configuration conf = getConf();
112    String inputDirs = args[0];
113    String tabName = args[1];
114    conf.setStrings(TABLES_KEY, tabName);
115    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
116    Job job = Job.getInstance(conf,
117      conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
118    // MapReduceHFileSplitter needs ExtendedCellSerialization so that sequenceId can be propagated
119    // when sorting cells in CellSortReducer
120    job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
121      true);
122    job.setJarByClass(MapReduceHFileSplitterJob.class);
123    job.setInputFormatClass(HFileInputFormat.class);
124    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
125    boolean diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf);
126    if (diskBasedSortingEnabled) {
127      job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
128      job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
129    } else {
130      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
131    }
132    if (hfileOutPath != null) {
133      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
134      TableName tableName = TableName.valueOf(tabName);
135      job.setMapperClass(HFileCellMapper.class);
136      if (diskBasedSortingEnabled) {
137        job.setReducerClass(PreSortedCellsReducer.class);
138      } else {
139        job.setReducerClass(CellSortReducer.class);
140      }
141      Path outputDir = new Path(hfileOutPath);
142      FileOutputFormat.setOutputPath(job, outputDir);
143      job.setMapOutputValueClass(MapReduceExtendedCell.class);
144      try (Connection conn = ConnectionFactory.createConnection(conf);
145        Table table = conn.getTable(tableName);
146        RegionLocator regionLocator = getRegionLocator(conf, conn, tableName)) {
147        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
148      }
149      LOG.debug("success configuring load incremental job");
150
151      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
152        org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
153    } else {
154      throw new IOException("No bulk output directory specified");
155    }
156    return job;
157  }
158
159  /**
160   * Print usage
161   * @param errorMsg Error message. Can be null.
162   */
163  private void usage(final String errorMsg) {
164    if (errorMsg != null && errorMsg.length() > 0) {
165      System.err.println("ERROR: " + errorMsg);
166    }
167    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
168    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
169    System.err.println("<table>  table to load.\n");
170    System.err.println("To generate HFiles for a bulk data load, pass the option:");
171    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
172    System.err.println("Other options:");
173    System.err.println("   -D " + JOB_NAME_CONF_KEY
174      + "=jobName - use the specified mapreduce job name for the HFile splitter");
175    System.err.println("For performance also consider the following options:\n"
176      + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
177  }
178
179  /**
180   * Main entry point.
181   * @param args The command line parameters.
182   * @throws Exception When running the job fails.
183   */
184  public static void main(String[] args) throws Exception {
185    int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args);
186    System.exit(ret);
187  }
188
189  @Override
190  public int run(String[] args) throws Exception {
191    if (args.length < 2) {
192      usage("Wrong number of arguments: " + args.length);
193      return -1;
194    }
195    Job job = createSubmittableJob(args);
196    int result = job.waitForCompletion(true) ? 0 : 1;
197    return result;
198  }
199
200  private static RegionLocator getRegionLocator(Configuration conf, Connection conn,
201    TableName table) throws IOException {
202    if (SnapshotRegionLocator.shouldUseSnapshotRegionLocator(conf, table)) {
203      return SnapshotRegionLocator.create(conf, table);
204    }
205
206    return conn.getRegionLocator(table);
207  }
208}