001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.text.ParseException; 022import java.text.SimpleDateFormat; 023import java.util.Map; 024import java.util.TreeMap; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.conf.Configured; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.KeyValueUtil; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Delete; 038import org.apache.hadoop.hbase.client.Mutation; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.RegionLocator; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 043import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 046import org.apache.hadoop.hbase.wal.WALEdit; 047import org.apache.hadoop.hbase.wal.WALKey; 048import org.apache.hadoop.mapreduce.Job; 049import org.apache.hadoop.mapreduce.Mapper; 050import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 051import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 052import org.apache.hadoop.util.Tool; 053import org.apache.hadoop.util.ToolRunner; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * A tool to replay WAL files as a M/R job. 060 * The WAL can be replayed for a set of tables or all tables, 061 * and a time range can be provided (in milliseconds). 062 * The WAL is filtered to the passed set of tables and the output 063 * can optionally be mapped to another set of tables. 064 * 065 * WAL replay can also generate HFiles for later bulk importing, 066 * in that case the WAL is replayed for a single table only. 067 */ 068@InterfaceAudience.Public 069public class WALPlayer extends Configured implements Tool { 070 private static final Logger LOG = LoggerFactory.getLogger(WALPlayer.class); 071 final static String NAME = "WALPlayer"; 072 public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; 073 public final static String TABLES_KEY = "wal.input.tables"; 074 public final static String TABLE_MAP_KEY = "wal.input.tablesmap"; 075 public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator"; 076 public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files"; 077 078 079 // This relies on Hadoop Configuration to handle warning about deprecated configs and 080 // to set the correct non-deprecated configs when an old one shows up. 081 static { 082 Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY); 083 Configuration.addDeprecation("hlog.input.tables", TABLES_KEY); 084 Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY); 085 } 086 087 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 088 089 public WALPlayer(){ 090 } 091 092 protected WALPlayer(final Configuration c) { 093 super(c); 094 } 095 096 /** 097 * A mapper that just writes out KeyValues. 098 * This one can be used together with {@link KeyValueSortReducer} 099 * @deprecated Use {@link WALCellMapper}. Will be removed from 3.0 onwards 100 */ 101 @Deprecated 102 static class WALKeyValueMapper 103 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> { 104 private byte[] table; 105 106 @Override 107 public void map(WALKey key, WALEdit value, 108 Context context) 109 throws IOException { 110 try { 111 // skip all other tables 112 if (Bytes.equals(table, key.getTableName().getName())) { 113 for (Cell cell : value.getCells()) { 114 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 115 if (WALEdit.isMetaEditFamily(kv)) { 116 continue; 117 } 118 context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv); 119 } 120 } 121 } catch (InterruptedException e) { 122 e.printStackTrace(); 123 } 124 } 125 126 @Override 127 public void setup(Context context) throws IOException { 128 // only a single table is supported when HFiles are generated with HFileOutputFormat 129 String[] tables = context.getConfiguration().getStrings(TABLES_KEY); 130 if (tables == null || tables.length != 1) { 131 // this can only happen when WALMapper is used directly by a class other than WALPlayer 132 throw new IOException("Exactly one table must be specified for bulk HFile case."); 133 } 134 table = Bytes.toBytes(tables[0]); 135 136 } 137 138 } 139 /** 140 * A mapper that just writes out Cells. 141 * This one can be used together with {@link CellSortReducer} 142 */ 143 static class WALCellMapper 144 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell> { 145 private byte[] table; 146 147 @Override 148 public void map(WALKey key, WALEdit value, 149 Context context) 150 throws IOException { 151 try { 152 // skip all other tables 153 if (Bytes.equals(table, key.getTableName().getName())) { 154 for (Cell cell : value.getCells()) { 155 if (WALEdit.isMetaEditFamily(cell)) { 156 continue; 157 } 158 context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)), 159 new MapReduceExtendedCell(cell)); 160 } 161 } 162 } catch (InterruptedException e) { 163 e.printStackTrace(); 164 } 165 } 166 167 @Override 168 public void setup(Context context) throws IOException { 169 // only a single table is supported when HFiles are generated with HFileOutputFormat 170 String[] tables = context.getConfiguration().getStrings(TABLES_KEY); 171 if (tables == null || tables.length != 1) { 172 // this can only happen when WALMapper is used directly by a class other than WALPlayer 173 throw new IOException("Exactly one table must be specified for bulk HFile case."); 174 } 175 table = Bytes.toBytes(tables[0]); 176 177 } 178 179 } 180 181 /** 182 * A mapper that writes out {@link Mutation} to be directly applied to 183 * a running HBase instance. 184 */ 185 protected static class WALMapper 186 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> { 187 private Map<TableName, TableName> tables = new TreeMap<>(); 188 189 @Override 190 public void map(WALKey key, WALEdit value, Context context) 191 throws IOException { 192 try { 193 if (tables.isEmpty() || tables.containsKey(key.getTableName())) { 194 TableName targetTable = tables.isEmpty() ? 195 key.getTableName() : 196 tables.get(key.getTableName()); 197 ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName()); 198 Put put = null; 199 Delete del = null; 200 Cell lastCell = null; 201 for (Cell cell : value.getCells()) { 202 // Filtering WAL meta marker entries. 203 if (WALEdit.isMetaEditFamily(cell)) { 204 continue; 205 } 206 // Allow a subclass filter out this cell. 207 if (filter(context, cell)) { 208 // A WALEdit may contain multiple operations (HBASE-3584) and/or 209 // multiple rows (HBASE-5229). 210 // Aggregate as much as possible into a single Put/Delete 211 // operation before writing to the context. 212 if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte() 213 || !CellUtil.matchingRows(lastCell, cell)) { 214 // row or type changed, write out aggregate KVs. 215 if (put != null) { 216 context.write(tableOut, put); 217 } 218 if (del != null) { 219 context.write(tableOut, del); 220 } 221 if (CellUtil.isDelete(cell)) { 222 del = new Delete(CellUtil.cloneRow(cell)); 223 } else { 224 put = new Put(CellUtil.cloneRow(cell)); 225 } 226 } 227 if (CellUtil.isDelete(cell)) { 228 del.add(cell); 229 } else { 230 put.add(cell); 231 } 232 } 233 lastCell = cell; 234 } 235 // write residual KVs 236 if (put != null) { 237 context.write(tableOut, put); 238 } 239 if (del != null) { 240 context.write(tableOut, del); 241 } 242 } 243 } catch (InterruptedException e) { 244 e.printStackTrace(); 245 } 246 } 247 248 protected boolean filter(Context context, final Cell cell) { 249 return true; 250 } 251 252 @Override 253 protected void 254 cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) 255 throws IOException, InterruptedException { 256 super.cleanup(context); 257 } 258 259 @Override 260 public void setup(Context context) throws IOException { 261 String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY); 262 String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY); 263 if (tableMap == null) { 264 tableMap = tablesToUse; 265 } 266 if (tablesToUse == null) { 267 // Then user wants all tables. 268 } else if (tablesToUse.length != tableMap.length) { 269 // this can only happen when WALMapper is used directly by a class other than WALPlayer 270 throw new IOException("Incorrect table mapping specified ."); 271 } 272 int i = 0; 273 if (tablesToUse != null) { 274 for (String table : tablesToUse) { 275 tables.put(TableName.valueOf(table), 276 TableName.valueOf(tableMap[i++])); 277 } 278 } 279 } 280 } 281 282 void setupTime(Configuration conf, String option) throws IOException { 283 String val = conf.get(option); 284 if (null == val) { 285 return; 286 } 287 long ms; 288 try { 289 // first try to parse in user friendly form 290 ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime(); 291 } catch (ParseException pe) { 292 try { 293 // then see if just a number of ms's was specified 294 ms = Long.parseLong(val); 295 } catch (NumberFormatException nfe) { 296 throw new IOException(option 297 + " must be specified either in the form 2001-02-20T16:35:06.99 " 298 + "or as number of milliseconds"); 299 } 300 } 301 conf.setLong(option, ms); 302 } 303 304 /** 305 * Sets up the actual job. 306 * 307 * @param args The command line parameters. 308 * @return The newly created job. 309 * @throws IOException When setting up the job fails. 310 */ 311 public Job createSubmittableJob(String[] args) throws IOException { 312 Configuration conf = getConf(); 313 setupTime(conf, WALInputFormat.START_TIME_KEY); 314 setupTime(conf, WALInputFormat.END_TIME_KEY); 315 String inputDirs = args[0]; 316 String[] tables = args[1].split(","); 317 String[] tableMap; 318 if (args.length > 2) { 319 tableMap = args[2].split(","); 320 if (tableMap.length != tables.length) { 321 throw new IOException("The same number of tables and mapping must be provided."); 322 } 323 } else { 324 // if not mapping is specified map each table to itself 325 tableMap = tables; 326 } 327 conf.setStrings(TABLES_KEY, tables); 328 conf.setStrings(TABLE_MAP_KEY, tableMap); 329 conf.set(FileInputFormat.INPUT_DIR, inputDirs); 330 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); 331 job.setJarByClass(WALPlayer.class); 332 333 job.setInputFormatClass(WALInputFormat.class); 334 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 335 336 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); 337 if (hfileOutPath != null) { 338 LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); 339 340 // the bulk HFile case 341 if (tables.length != 1) { 342 throw new IOException("Exactly one table must be specified for the bulk export option"); 343 } 344 TableName tableName = TableName.valueOf(tables[0]); 345 job.setMapperClass(WALCellMapper.class); 346 job.setReducerClass(CellSortReducer.class); 347 Path outputDir = new Path(hfileOutPath); 348 FileOutputFormat.setOutputPath(job, outputDir); 349 job.setMapOutputValueClass(MapReduceExtendedCell.class); 350 try (Connection conn = ConnectionFactory.createConnection(conf); 351 Table table = conn.getTable(tableName); 352 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 353 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 354 } 355 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 356 org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); 357 } else { 358 // output to live cluster 359 job.setMapperClass(WALMapper.class); 360 job.setOutputFormatClass(MultiTableOutputFormat.class); 361 TableMapReduceUtil.addDependencyJars(job); 362 TableMapReduceUtil.initCredentials(job); 363 // No reducers. 364 job.setNumReduceTasks(0); 365 } 366 String codecCls = WALCellCodec.getWALCellCodecClass(conf).getName(); 367 try { 368 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Class.forName(codecCls)); 369 } catch (Exception e) { 370 throw new IOException("Cannot determine wal codec class " + codecCls, e); 371 } 372 return job; 373 } 374 375 376 /** 377 * Print usage 378 * @param errorMsg Error message. Can be null. 379 */ 380 private void usage(final String errorMsg) { 381 if (errorMsg != null && errorMsg.length() > 0) { 382 System.err.println("ERROR: " + errorMsg); 383 } 384 System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]"); 385 System.err.println("Replay all WAL files into HBase."); 386 System.err.println("<tables> is a comma separated list of tables."); 387 System.err.println("If no tables (\"\") are specified, all tables are imported."); 388 System.err.println("(Be careful, hbase:meta entries will be imported in this case.)\n"); 389 System.err.println("WAL entries can be mapped to new set of tables via <tableMappings>."); 390 System.err.println("<tableMappings> is a comma separated list of target tables."); 391 System.err.println("If specified, each table in <tables> must have a mapping.\n"); 392 System.err.println("By default " + NAME + " will load data directly into HBase."); 393 System.err.println("To generate HFiles for a bulk data load instead, pass the following option:"); 394 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); 395 System.err.println(" (Only one table can be specified, and no mapping is allowed!)"); 396 System.err.println("Time range options:"); 397 System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]"); 398 System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]"); 399 System.err.println(" (The start and the end date of timerange. The dates can be expressed"); 400 System.err.println(" in milliseconds since epoch or in yyyy-MM-dd'T'HH:mm:ss.SS format."); 401 System.err.println(" E.g. 1234567890120 or 2009-02-13T23:32:30.12)"); 402 System.err.println("Other options:"); 403 System.err.println(" -D" + JOB_NAME_CONF_KEY + "=jobName"); 404 System.err.println(" Use the specified mapreduce job name for the wal player"); 405 System.err.println("For performance also consider the following options:\n" 406 + " -Dmapreduce.map.speculative=false\n" 407 + " -Dmapreduce.reduce.speculative=false"); 408 } 409 410 /** 411 * Main entry point. 412 * 413 * @param args The command line parameters. 414 * @throws Exception When running the job fails. 415 */ 416 public static void main(String[] args) throws Exception { 417 int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args); 418 System.exit(ret); 419 } 420 421 @Override 422 public int run(String[] args) throws Exception { 423 if (args.length < 2) { 424 usage("Wrong number of arguments: " + args.length); 425 System.exit(-1); 426 } 427 Job job = createSubmittableJob(args); 428 return job.waitForCompletion(true) ? 0 : 1; 429 } 430}