001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util.bulkdatagenerator;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
039import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
040import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
041import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
044import org.apache.hadoop.util.GenericOptionsParser;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
049import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
050import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
051import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
052import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
053import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
054import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
055import org.apache.hbase.thirdparty.org.apache.commons.cli.Parser;
056
057/**
058 * A command line utility to generate pre-splitted HBase Tables with large amount (TBs) of random
059 * data, equally distributed among all regions.
060 */
061public class BulkDataGeneratorTool {
062
063  private static final Logger logger = LoggerFactory.getLogger(BulkDataGeneratorTool.class);
064
065  /**
066   * Prefix for the generated HFiles directory
067   */
068  private static final String OUTPUT_DIRECTORY_PREFIX = "/bulk_data_generator/";
069
070  /**
071   * Number of mapper container to be launched for generating of HFiles
072   */
073  private int mapperCount;
074
075  /**
076   * Number of rows to be generated by each mapper
077   */
078  private long rowsPerMapper;
079
080  /**
081   * Table for which random data needs to be generated
082   */
083  private String table;
084
085  /**
086   * Number of splits for the {@link #table}. Number of regions for the table will be
087   * ({@link #splitCount} + 1).
088   */
089  private int splitCount;
090
091  /**
092   * Flag to delete the table (before creating) if it already exists
093   */
094  private boolean deleteTableIfExist;
095
096  /**
097   * Additional HBase meta-data options to be set for the table
098   */
099  private final Map<String, String> tableOptions = new HashMap<>();
100
101  public static void main(String[] args) throws Exception {
102    Configuration conf = HBaseConfiguration.create();
103    BulkDataGeneratorTool bulkDataGeneratorTool = new BulkDataGeneratorTool();
104    bulkDataGeneratorTool.run(conf, args);
105  }
106
107  public boolean run(Configuration conf, String[] args) throws IOException {
108    // Read CLI arguments
109    CommandLine line = null;
110    try {
111      Parser parser = new GnuParser();
112      line = parser.parse(getOptions(), args);
113      readCommandLineParameters(conf, line);
114    } catch (ParseException | IOException exception) {
115      logger.error("Error while parsing CLI arguments.", exception);
116      printUsage();
117      return false;
118    }
119
120    if (line.hasOption("-h")) {
121      printUsage();
122      return true;
123    }
124
125    Preconditions.checkArgument(!StringUtils.isEmpty(table), "Table name must not be empty");
126    Preconditions.checkArgument(mapperCount > 0, "Mapper count must be greater than 0");
127    Preconditions.checkArgument((splitCount > 0) && (splitCount < Utility.MAX_SPLIT_COUNT),
128      "Split count must be greater than 0 and less than " + Utility.MAX_SPLIT_COUNT);
129    Preconditions.checkArgument(rowsPerMapper > 0, "Rows per mapper must be greater than 0");
130
131    Path outputDirectory = generateOutputDirectory();
132    logger.info("HFiles will be generated at " + outputDirectory.toString());
133
134    try (Connection connection = ConnectionFactory.createConnection(conf)) {
135      final Admin admin = connection.getAdmin();
136      final TableName tableName = TableName.valueOf(table);
137      if (admin.tableExists(tableName)) {
138        if (deleteTableIfExist) {
139          logger.info(
140            "Deleting the table since it already exist and delete-if-exist flag is set to true");
141          Utility.deleteTable(admin, table);
142        } else {
143          logger.info("Table already exists, cannot generate HFiles for existing table.");
144          return false;
145        }
146      }
147
148      // Creating the pre-split table
149      Utility.createTable(admin, table, splitCount, tableOptions);
150      logger.info(table + " created successfully");
151
152      Job job = createSubmittableJob(conf);
153
154      Table hbaseTable = connection.getTable(tableName);
155
156      // Auto configure partitioner and reducer
157      HFileOutputFormat2.configureIncrementalLoad(job, hbaseTable, hbaseTable.getRegionLocator());
158
159      FileOutputFormat.setOutputPath(job, outputDirectory);
160
161      boolean result = job.waitForCompletion(true);
162
163      if (result) {
164        logger.info("HFiles generated successfully. Starting bulk load to " + table);
165        BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf);
166        Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> bulkLoadedHFiles =
167          bulkLoadHFilesTool.bulkLoad(tableName, outputDirectory);
168        boolean status = !bulkLoadedHFiles.isEmpty();
169        logger.info("BulkLoadHFiles finished successfully with status " + status);
170        return status;
171      } else {
172        logger.info("Failed to generate HFiles.");
173        return false;
174      }
175    } catch (Exception e) {
176      logger.error("Failed to generate data", e);
177      return false;
178    } finally {
179      FileSystem.get(conf).deleteOnExit(outputDirectory);
180    }
181  }
182
183  protected Job createSubmittableJob(Configuration conf) throws IOException {
184
185    conf.setInt(BulkDataGeneratorMapper.SPLIT_COUNT_KEY, splitCount);
186    conf.setInt(BulkDataGeneratorInputFormat.MAPPER_TASK_COUNT_KEY, mapperCount);
187    conf.setLong(BulkDataGeneratorRecordReader.RECORDS_PER_MAPPER_TASK_KEY, rowsPerMapper);
188
189    Job job = new Job(conf, BulkDataGeneratorTool.class.getSimpleName() + " - " + table);
190
191    job.setJarByClass(BulkDataGeneratorMapper.class);
192    job.setInputFormatClass(BulkDataGeneratorInputFormat.class);
193
194    HBaseConfiguration.addHbaseResources(conf);
195
196    job.setMapperClass(BulkDataGeneratorMapper.class);
197    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
198    job.setMapOutputValueClass(KeyValue.class);
199
200    return job;
201  }
202
203  /** Returns Random output directory path where HFiles will be generated */
204  protected Path generateOutputDirectory() {
205    final String outputDirectory =
206      OUTPUT_DIRECTORY_PREFIX + "/" + table + "-" + System.currentTimeMillis();
207    return new Path(outputDirectory);
208  }
209
210  /**
211   * This method parses the command line parameters into instance variables
212   */
213  protected void readCommandLineParameters(Configuration conf, CommandLine line)
214    throws ParseException, IOException {
215    final List<String> genericParameters = new ArrayList<String>();
216
217    // Parse the generic options
218    for (Map.Entry<Object, Object> entry : line.getOptionProperties("D").entrySet()) {
219      genericParameters.add("-D");
220      genericParameters.add(entry.getKey() + "=" + entry.getValue());
221    }
222
223    logger.info(
224      "Parsed generic parameters: " + Arrays.toString(genericParameters.toArray(new String[0])));
225
226    new GenericOptionsParser(conf, genericParameters.toArray(new String[0]));
227
228    table = line.getOptionValue("table");
229
230    if (line.hasOption("mapper-count")) {
231      mapperCount = Integer.parseInt(line.getOptionValue("mapper-count"));
232    }
233    if (line.hasOption("split-count")) {
234      splitCount = Integer.parseInt(line.getOptionValue("split-count"));
235    }
236    if (line.hasOption("rows-per-mapper")) {
237      rowsPerMapper = Long.parseLong(line.getOptionValue("rows-per-mapper"));
238    }
239
240    deleteTableIfExist = line.hasOption("delete-if-exist");
241
242    parseTableOptions(line);
243  }
244
245  private void parseTableOptions(final CommandLine line) {
246    final String tableOptionsAsString = line.getOptionValue("table-options");
247    if (!StringUtils.isEmpty(tableOptionsAsString)) {
248      for (String tableOption : tableOptionsAsString.split(",")) {
249        final String[] keyValueSplit = tableOption.split("=");
250        final String key = keyValueSplit[0];
251        final String value = keyValueSplit[1];
252        tableOptions.put(key, value);
253      }
254    }
255  }
256
257  /** Returns the command line option for {@link BulkDataGeneratorTool} */
258  protected Options getOptions() {
259    final Options options = new Options();
260    Option option =
261      new Option("t", "table", true, "The table name for which data need to be generated.");
262    options.addOption(option);
263
264    option = new Option("d", "delete-if-exist", false,
265      "If it's set, the table will be deleted if already exist.");
266    options.addOption(option);
267
268    option =
269      new Option("mc", "mapper-count", true, "The number of mapper containers to be launched.");
270    options.addOption(option);
271
272    option = new Option("sc", "split-count", true,
273      "The number of regions/pre-splits to be created for the table.");
274    options.addOption(option);
275
276    option =
277      new Option("r", "rows-per-mapper", true, "The number of rows to be generated PER mapper.");
278    options.addOption(option);
279
280    option =
281      new Option("o", "table-options", true, "Table options to be set while creating the table.");
282    options.addOption(option);
283
284    option = new Option("h", "help", false, "Show help message for the tool");
285    options.addOption(option);
286
287    return options;
288  }
289
290  protected void printUsage() {
291    final HelpFormatter helpFormatter = new HelpFormatter();
292    helpFormatter.setWidth(120);
293    final String helpMessageCommand = "hbase " + BulkDataGeneratorTool.class.getName();
294    final String commandSyntax = helpMessageCommand + " <OPTIONS> [-D<property=value>]*";
295    final String helpMessageSuffix = "Examples:\n" + helpMessageCommand
296      + " -t TEST_TABLE -mc 10 -r 100 -sc 10\n" + helpMessageCommand
297      + " -t TEST_TABLE -mc 10 -r 100 -sc 10 -d -o \"BACKUP=false,NORMALIZATION_ENABLED=false\"\n"
298      + helpMessageCommand + " -t TEST_TABLE -mc 10 -r 100 -sc 10 -Dmapreduce.map.memory.mb=8192\n";
299    helpFormatter.printHelp(commandSyntax, "", getOptions(), helpMessageSuffix);
300  }
301}