001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util.bulkdatagenerator; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import org.apache.commons.lang3.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Admin; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; 040import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 041import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 044import org.apache.hadoop.util.GenericOptionsParser; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 049import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 050import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 051import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 052import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 053import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 054import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 055import org.apache.hbase.thirdparty.org.apache.commons.cli.Parser; 056 057/** 058 * A command line utility to generate pre-splitted HBase Tables with large amount (TBs) of random 059 * data, equally distributed among all regions. 060 */ 061public class BulkDataGeneratorTool { 062 063 private static final Logger logger = LoggerFactory.getLogger(BulkDataGeneratorTool.class); 064 065 /** 066 * Prefix for the generated HFiles directory 067 */ 068 private static final String OUTPUT_DIRECTORY_PREFIX = "/bulk_data_generator/"; 069 070 /** 071 * Number of mapper container to be launched for generating of HFiles 072 */ 073 private int mapperCount; 074 075 /** 076 * Number of rows to be generated by each mapper 077 */ 078 private long rowsPerMapper; 079 080 /** 081 * Table for which random data needs to be generated 082 */ 083 private String table; 084 085 /** 086 * Number of splits for the {@link #table}. Number of regions for the table will be 087 * ({@link #splitCount} + 1). 088 */ 089 private int splitCount; 090 091 /** 092 * Flag to delete the table (before creating) if it already exists 093 */ 094 private boolean deleteTableIfExist; 095 096 /** 097 * Additional HBase meta-data options to be set for the table 098 */ 099 private final Map<String, String> tableOptions = new HashMap<>(); 100 101 public static void main(String[] args) throws Exception { 102 Configuration conf = HBaseConfiguration.create(); 103 BulkDataGeneratorTool bulkDataGeneratorTool = new BulkDataGeneratorTool(); 104 bulkDataGeneratorTool.run(conf, args); 105 } 106 107 public boolean run(Configuration conf, String[] args) throws IOException { 108 // Read CLI arguments 109 CommandLine line = null; 110 try { 111 Parser parser = new GnuParser(); 112 line = parser.parse(getOptions(), args); 113 readCommandLineParameters(conf, line); 114 } catch (ParseException | IOException exception) { 115 logger.error("Error while parsing CLI arguments.", exception); 116 printUsage(); 117 return false; 118 } 119 120 if (line.hasOption("-h")) { 121 printUsage(); 122 return true; 123 } 124 125 Preconditions.checkArgument(!StringUtils.isEmpty(table), "Table name must not be empty"); 126 Preconditions.checkArgument(mapperCount > 0, "Mapper count must be greater than 0"); 127 Preconditions.checkArgument((splitCount > 0) && (splitCount < Utility.MAX_SPLIT_COUNT), 128 "Split count must be greater than 0 and less than " + Utility.MAX_SPLIT_COUNT); 129 Preconditions.checkArgument(rowsPerMapper > 0, "Rows per mapper must be greater than 0"); 130 131 Path outputDirectory = generateOutputDirectory(); 132 logger.info("HFiles will be generated at " + outputDirectory.toString()); 133 134 try (Connection connection = ConnectionFactory.createConnection(conf)) { 135 final Admin admin = connection.getAdmin(); 136 final TableName tableName = TableName.valueOf(table); 137 if (admin.tableExists(tableName)) { 138 if (deleteTableIfExist) { 139 logger.info( 140 "Deleting the table since it already exist and delete-if-exist flag is set to true"); 141 Utility.deleteTable(admin, table); 142 } else { 143 logger.info("Table already exists, cannot generate HFiles for existing table."); 144 return false; 145 } 146 } 147 148 // Creating the pre-split table 149 Utility.createTable(admin, table, splitCount, tableOptions); 150 logger.info(table + " created successfully"); 151 152 Job job = createSubmittableJob(conf); 153 154 Table hbaseTable = connection.getTable(tableName); 155 156 // Auto configure partitioner and reducer 157 HFileOutputFormat2.configureIncrementalLoad(job, hbaseTable, hbaseTable.getRegionLocator()); 158 159 FileOutputFormat.setOutputPath(job, outputDirectory); 160 161 boolean result = job.waitForCompletion(true); 162 163 if (result) { 164 logger.info("HFiles generated successfully. Starting bulk load to " + table); 165 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf); 166 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> bulkLoadedHFiles = 167 bulkLoadHFilesTool.bulkLoad(tableName, outputDirectory); 168 boolean status = !bulkLoadedHFiles.isEmpty(); 169 logger.info("BulkLoadHFiles finished successfully with status " + status); 170 return status; 171 } else { 172 logger.info("Failed to generate HFiles."); 173 return false; 174 } 175 } catch (Exception e) { 176 logger.error("Failed to generate data", e); 177 return false; 178 } finally { 179 FileSystem.get(conf).deleteOnExit(outputDirectory); 180 } 181 } 182 183 protected Job createSubmittableJob(Configuration conf) throws IOException { 184 185 conf.setInt(BulkDataGeneratorMapper.SPLIT_COUNT_KEY, splitCount); 186 conf.setInt(BulkDataGeneratorInputFormat.MAPPER_TASK_COUNT_KEY, mapperCount); 187 conf.setLong(BulkDataGeneratorRecordReader.RECORDS_PER_MAPPER_TASK_KEY, rowsPerMapper); 188 189 Job job = new Job(conf, BulkDataGeneratorTool.class.getSimpleName() + " - " + table); 190 191 job.setJarByClass(BulkDataGeneratorMapper.class); 192 job.setInputFormatClass(BulkDataGeneratorInputFormat.class); 193 194 HBaseConfiguration.addHbaseResources(conf); 195 196 job.setMapperClass(BulkDataGeneratorMapper.class); 197 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 198 job.setMapOutputValueClass(KeyValue.class); 199 200 return job; 201 } 202 203 /** Returns Random output directory path where HFiles will be generated */ 204 protected Path generateOutputDirectory() { 205 final String outputDirectory = 206 OUTPUT_DIRECTORY_PREFIX + "/" + table + "-" + System.currentTimeMillis(); 207 return new Path(outputDirectory); 208 } 209 210 /** 211 * This method parses the command line parameters into instance variables 212 */ 213 protected void readCommandLineParameters(Configuration conf, CommandLine line) 214 throws ParseException, IOException { 215 final List<String> genericParameters = new ArrayList<String>(); 216 217 // Parse the generic options 218 for (Map.Entry<Object, Object> entry : line.getOptionProperties("D").entrySet()) { 219 genericParameters.add("-D"); 220 genericParameters.add(entry.getKey() + "=" + entry.getValue()); 221 } 222 223 logger.info( 224 "Parsed generic parameters: " + Arrays.toString(genericParameters.toArray(new String[0]))); 225 226 new GenericOptionsParser(conf, genericParameters.toArray(new String[0])); 227 228 table = line.getOptionValue("table"); 229 230 if (line.hasOption("mapper-count")) { 231 mapperCount = Integer.parseInt(line.getOptionValue("mapper-count")); 232 } 233 if (line.hasOption("split-count")) { 234 splitCount = Integer.parseInt(line.getOptionValue("split-count")); 235 } 236 if (line.hasOption("rows-per-mapper")) { 237 rowsPerMapper = Long.parseLong(line.getOptionValue("rows-per-mapper")); 238 } 239 240 deleteTableIfExist = line.hasOption("delete-if-exist"); 241 242 parseTableOptions(line); 243 } 244 245 private void parseTableOptions(final CommandLine line) { 246 final String tableOptionsAsString = line.getOptionValue("table-options"); 247 if (!StringUtils.isEmpty(tableOptionsAsString)) { 248 for (String tableOption : tableOptionsAsString.split(",")) { 249 final String[] keyValueSplit = tableOption.split("="); 250 final String key = keyValueSplit[0]; 251 final String value = keyValueSplit[1]; 252 tableOptions.put(key, value); 253 } 254 } 255 } 256 257 /** Returns the command line option for {@link BulkDataGeneratorTool} */ 258 protected Options getOptions() { 259 final Options options = new Options(); 260 Option option = 261 new Option("t", "table", true, "The table name for which data need to be generated."); 262 options.addOption(option); 263 264 option = new Option("d", "delete-if-exist", false, 265 "If it's set, the table will be deleted if already exist."); 266 options.addOption(option); 267 268 option = 269 new Option("mc", "mapper-count", true, "The number of mapper containers to be launched."); 270 options.addOption(option); 271 272 option = new Option("sc", "split-count", true, 273 "The number of regions/pre-splits to be created for the table."); 274 options.addOption(option); 275 276 option = 277 new Option("r", "rows-per-mapper", true, "The number of rows to be generated PER mapper."); 278 options.addOption(option); 279 280 option = 281 new Option("o", "table-options", true, "Table options to be set while creating the table."); 282 options.addOption(option); 283 284 option = new Option("h", "help", false, "Show help message for the tool"); 285 options.addOption(option); 286 287 return options; 288 } 289 290 protected void printUsage() { 291 final HelpFormatter helpFormatter = new HelpFormatter(); 292 helpFormatter.setWidth(120); 293 final String helpMessageCommand = "hbase " + BulkDataGeneratorTool.class.getName(); 294 final String commandSyntax = helpMessageCommand + " <OPTIONS> [-D<property=value>]*"; 295 final String helpMessageSuffix = "Examples:\n" + helpMessageCommand 296 + " -t TEST_TABLE -mc 10 -r 100 -sc 10\n" + helpMessageCommand 297 + " -t TEST_TABLE -mc 10 -r 100 -sc 10 -d -o \"BACKUP=false,NORMALIZATION_ENABLED=false\"\n" 298 + helpMessageCommand + " -t TEST_TABLE -mc 10 -r 100 -sc 10 -Dmapreduce.map.memory.mb=8192\n"; 299 helpFormatter.printHelp(commandSyntax, "", getOptions(), helpMessageSuffix); 300 } 301}