001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.text.ParseException; 022import java.text.SimpleDateFormat; 023import java.util.Map; 024import java.util.TreeMap; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.KeyValueUtil; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Delete; 037import org.apache.hadoop.hbase.client.Mutation; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionLocator; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 042import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 045import org.apache.hadoop.hbase.wal.WALEdit; 046import org.apache.hadoop.hbase.wal.WALKey; 047import org.apache.hadoop.mapreduce.Job; 048import org.apache.hadoop.mapreduce.Mapper; 049import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 050import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 051import org.apache.hadoop.util.Tool; 052import org.apache.hadoop.util.ToolRunner; 053import org.apache.yetus.audience.InterfaceAudience; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057 058 059/** 060 * A tool to replay WAL files as a M/R job. 061 * The WAL can be replayed for a set of tables or all tables, 062 * and a time range can be provided (in milliseconds). 063 * The WAL is filtered to the passed set of tables and the output 064 * can optionally be mapped to another set of tables. 065 * 066 * WAL replay can also generate HFiles for later bulk importing, 067 * in that case the WAL is replayed for a single table only. 068 */ 069@InterfaceAudience.Public 070public class WALPlayer extends Configured implements Tool { 071 private static final Logger LOG = LoggerFactory.getLogger(WALPlayer.class); 072 final static String NAME = "WALPlayer"; 073 public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; 074 public final static String TABLES_KEY = "wal.input.tables"; 075 public final static String TABLE_MAP_KEY = "wal.input.tablesmap"; 076 public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator"; 077 public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files"; 078 079 080 // This relies on Hadoop Configuration to handle warning about deprecated configs and 081 // to set the correct non-deprecated configs when an old one shows up. 082 static { 083 Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY); 084 Configuration.addDeprecation("hlog.input.tables", TABLES_KEY); 085 Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY); 086 } 087 088 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 089 090 public WALPlayer(){ 091 } 092 093 protected WALPlayer(final Configuration c) { 094 super(c); 095 } 096 097 /** 098 * A mapper that just writes out KeyValues. 099 * This one can be used together with {@link KeyValueSortReducer} 100 * @deprecated Use {@link WALCellMapper}. Will be removed from 3.0 onwards 101 */ 102 @Deprecated 103 static class WALKeyValueMapper 104 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> { 105 private byte[] table; 106 107 @Override 108 public void map(WALKey key, WALEdit value, 109 Context context) 110 throws IOException { 111 try { 112 // skip all other tables 113 if (Bytes.equals(table, key.getTableName().getName())) { 114 for (Cell cell : value.getCells()) { 115 KeyValue kv = KeyValueUtil.ensureKeyValue(cell); 116 if (WALEdit.isMetaEditFamily(kv)) { 117 continue; 118 } 119 context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv); 120 } 121 } 122 } catch (InterruptedException e) { 123 e.printStackTrace(); 124 } 125 } 126 127 @Override 128 public void setup(Context context) throws IOException { 129 // only a single table is supported when HFiles are generated with HFileOutputFormat 130 String[] tables = context.getConfiguration().getStrings(TABLES_KEY); 131 if (tables == null || tables.length != 1) { 132 // this can only happen when WALMapper is used directly by a class other than WALPlayer 133 throw new IOException("Exactly one table must be specified for bulk HFile case."); 134 } 135 table = Bytes.toBytes(tables[0]); 136 137 } 138 139 } 140 /** 141 * A mapper that just writes out Cells. 142 * This one can be used together with {@link CellSortReducer} 143 */ 144 static class WALCellMapper 145 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell> { 146 private byte[] table; 147 148 @Override 149 public void map(WALKey key, WALEdit value, 150 Context context) 151 throws IOException { 152 try { 153 // skip all other tables 154 if (Bytes.equals(table, key.getTableName().getName())) { 155 for (Cell cell : value.getCells()) { 156 if (WALEdit.isMetaEditFamily(cell)) { 157 continue; 158 } 159 context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)), 160 new MapReduceExtendedCell(cell)); 161 } 162 } 163 } catch (InterruptedException e) { 164 e.printStackTrace(); 165 } 166 } 167 168 @Override 169 public void setup(Context context) throws IOException { 170 // only a single table is supported when HFiles are generated with HFileOutputFormat 171 String[] tables = context.getConfiguration().getStrings(TABLES_KEY); 172 if (tables == null || tables.length != 1) { 173 // this can only happen when WALMapper is used directly by a class other than WALPlayer 174 throw new IOException("Exactly one table must be specified for bulk HFile case."); 175 } 176 table = Bytes.toBytes(tables[0]); 177 178 } 179 180 } 181 182 /** 183 * Enum for map metrics. Keep it out here rather than inside in the Map 184 * inner-class so we can find associated properties. 185 */ 186 protected static enum Counter { 187 /** Number of aggregated writes */ 188 PUTS, 189 /** Number of aggregated deletes */ 190 DELETES, 191 CELLS_READ, 192 CELLS_WRITTEN, 193 WALEDITS 194 } 195 196 /** 197 * A mapper that writes out {@link Mutation} to be directly applied to 198 * a running HBase instance. 199 */ 200 protected static class WALMapper 201 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> { 202 private Map<TableName, TableName> tables = new TreeMap<>(); 203 204 @Override 205 public void map(WALKey key, WALEdit value, Context context) 206 throws IOException { 207 context.getCounter(Counter.WALEDITS).increment(1); 208 try { 209 if (tables.isEmpty() || tables.containsKey(key.getTableName())) { 210 TableName targetTable = tables.isEmpty() ? 211 key.getTableName() : 212 tables.get(key.getTableName()); 213 ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName()); 214 Put put = null; 215 Delete del = null; 216 Cell lastCell = null; 217 for (Cell cell : value.getCells()) { 218 context.getCounter(Counter.CELLS_READ).increment(1); 219 // Filtering WAL meta marker entries. 220 if (WALEdit.isMetaEditFamily(cell)) { 221 continue; 222 } 223 // Allow a subclass filter out this cell. 224 if (filter(context, cell)) { 225 // A WALEdit may contain multiple operations (HBASE-3584) and/or 226 // multiple rows (HBASE-5229). 227 // Aggregate as much as possible into a single Put/Delete 228 // operation before writing to the context. 229 if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte() 230 || !CellUtil.matchingRows(lastCell, cell)) { 231 // row or type changed, write out aggregate KVs. 232 if (put != null) { 233 context.write(tableOut, put); 234 context.getCounter(Counter.PUTS).increment(1); 235 } 236 if (del != null) { 237 context.write(tableOut, del); 238 context.getCounter(Counter.DELETES).increment(1); 239 } 240 if (CellUtil.isDelete(cell)) { 241 del = new Delete(CellUtil.cloneRow(cell)); 242 } else { 243 put = new Put(CellUtil.cloneRow(cell)); 244 } 245 } 246 if (CellUtil.isDelete(cell)) { 247 del.add(cell); 248 } else { 249 put.add(cell); 250 } 251 context.getCounter(Counter.CELLS_WRITTEN).increment(1); 252 } 253 lastCell = cell; 254 } 255 // write residual KVs 256 if (put != null) { 257 context.write(tableOut, put); 258 context.getCounter(Counter.PUTS).increment(1); 259 } 260 if (del != null) { 261 context.getCounter(Counter.DELETES).increment(1); 262 context.write(tableOut, del); 263 } 264 } 265 } catch (InterruptedException e) { 266 e.printStackTrace(); 267 } 268 } 269 270 protected boolean filter(Context context, final Cell cell) { 271 return true; 272 } 273 274 @Override 275 protected void 276 cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) 277 throws IOException, InterruptedException { 278 super.cleanup(context); 279 } 280 281 @SuppressWarnings("checkstyle:EmptyBlock") 282 @Override 283 public void setup(Context context) throws IOException { 284 String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY); 285 String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY); 286 if (tableMap == null) { 287 tableMap = tablesToUse; 288 } 289 if (tablesToUse == null) { 290 // Then user wants all tables. 291 } else if (tablesToUse.length != tableMap.length) { 292 // this can only happen when WALMapper is used directly by a class other than WALPlayer 293 throw new IOException("Incorrect table mapping specified ."); 294 } 295 int i = 0; 296 if (tablesToUse != null) { 297 for (String table : tablesToUse) { 298 tables.put(TableName.valueOf(table), 299 TableName.valueOf(tableMap[i++])); 300 } 301 } 302 } 303 } 304 305 void setupTime(Configuration conf, String option) throws IOException { 306 String val = conf.get(option); 307 if (null == val) { 308 return; 309 } 310 long ms; 311 try { 312 // first try to parse in user friendly form 313 ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime(); 314 } catch (ParseException pe) { 315 try { 316 // then see if just a number of ms's was specified 317 ms = Long.parseLong(val); 318 } catch (NumberFormatException nfe) { 319 throw new IOException(option 320 + " must be specified either in the form 2001-02-20T16:35:06.99 " 321 + "or as number of milliseconds"); 322 } 323 } 324 conf.setLong(option, ms); 325 } 326 327 /** 328 * Sets up the actual job. 329 * 330 * @param args The command line parameters. 331 * @return The newly created job. 332 * @throws IOException When setting up the job fails. 333 */ 334 public Job createSubmittableJob(String[] args) throws IOException { 335 Configuration conf = getConf(); 336 setupTime(conf, WALInputFormat.START_TIME_KEY); 337 setupTime(conf, WALInputFormat.END_TIME_KEY); 338 String inputDirs = args[0]; 339 String[] tables = args.length == 1? new String [] {}: args[1].split(","); 340 String[] tableMap; 341 if (args.length > 2) { 342 tableMap = args[2].split(","); 343 if (tableMap.length != tables.length) { 344 throw new IOException("The same number of tables and mapping must be provided."); 345 } 346 } else { 347 // if no mapping is specified, map each table to itself 348 tableMap = tables; 349 } 350 conf.setStrings(TABLES_KEY, tables); 351 conf.setStrings(TABLE_MAP_KEY, tableMap); 352 conf.set(FileInputFormat.INPUT_DIR, inputDirs); 353 Job job = 354 Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); 355 job.setJarByClass(WALPlayer.class); 356 357 job.setInputFormatClass(WALInputFormat.class); 358 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 359 360 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); 361 if (hfileOutPath != null) { 362 LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); 363 364 // the bulk HFile case 365 if (tables.length != 1) { 366 throw new IOException("Exactly one table must be specified for the bulk export option"); 367 } 368 TableName tableName = TableName.valueOf(tables[0]); 369 job.setMapperClass(WALCellMapper.class); 370 job.setReducerClass(CellSortReducer.class); 371 Path outputDir = new Path(hfileOutPath); 372 FileOutputFormat.setOutputPath(job, outputDir); 373 job.setMapOutputValueClass(MapReduceExtendedCell.class); 374 try (Connection conn = ConnectionFactory.createConnection(conf); 375 Table table = conn.getTable(tableName); 376 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 377 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 378 } 379 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 380 org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); 381 } else { 382 // output to live cluster 383 job.setMapperClass(WALMapper.class); 384 job.setOutputFormatClass(MultiTableOutputFormat.class); 385 TableMapReduceUtil.addDependencyJars(job); 386 TableMapReduceUtil.initCredentials(job); 387 // No reducers. 388 job.setNumReduceTasks(0); 389 } 390 String codecCls = WALCellCodec.getWALCellCodecClass(conf).getName(); 391 try { 392 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Class.forName(codecCls)); 393 } catch (Exception e) { 394 throw new IOException("Cannot determine wal codec class " + codecCls, e); 395 } 396 return job; 397 } 398 399 400 /** 401 * Print usage 402 * @param errorMsg Error message. Can be null. 403 */ 404 private void usage(final String errorMsg) { 405 if (errorMsg != null && errorMsg.length() > 0) { 406 System.err.println("ERROR: " + errorMsg); 407 } 408 System.err.println("Usage: " + NAME + " [options] <WAL inputdir> [<tables> <tableMappings>]"); 409 System.err.println(" <WAL inputdir> directory of WALs to replay."); 410 System.err.println(" <tables> comma separated list of tables. If no tables specified,"); 411 System.err.println(" all are imported (even hbase:meta if present)."); 412 System.err.println(" <tableMappings> WAL entries can be mapped to a new set of tables by " + 413 "passing"); 414 System.err.println(" <tableMappings>, a comma separated list of target " + 415 "tables."); 416 System.err.println(" If specified, each table in <tables> must have a " + 417 "mapping."); 418 System.err.println("To generate HFiles to bulk load instead of loading HBase directly, pass:"); 419 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); 420 System.err.println(" Only one table can be specified, and no mapping allowed!"); 421 System.err.println("To specify a time range, pass:"); 422 System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]"); 423 System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]"); 424 System.err.println(" The start and the end date of timerange (inclusive). The dates can be"); 425 System.err.println(" expressed in milliseconds-since-epoch or yyyy-MM-dd'T'HH:mm:ss.SS " + 426 "format."); 427 System.err.println(" E.g. 1234567890120 or 2009-02-13T23:32:30.12"); 428 System.err.println("Other options:"); 429 System.err.println(" -D" + JOB_NAME_CONF_KEY + "=jobName"); 430 System.err.println(" Use the specified mapreduce job name for the wal player"); 431 System.err.println(" -Dwal.input.separator=' '"); 432 System.err.println(" Change WAL filename separator (WAL dir names use default ','.)"); 433 System.err.println("For performance also consider the following options:\n" 434 + " -Dmapreduce.map.speculative=false\n" 435 + " -Dmapreduce.reduce.speculative=false"); 436 } 437 438 /** 439 * Main entry point. 440 * 441 * @param args The command line parameters. 442 * @throws Exception When running the job fails. 443 */ 444 public static void main(String[] args) throws Exception { 445 int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args); 446 System.exit(ret); 447 } 448 449 @Override 450 public int run(String[] args) throws Exception { 451 if (args.length < 1) { 452 usage("Wrong number of arguments: " + args.length); 453 System.exit(-1); 454 } 455 Job job = createSubmittableJob(args); 456 return job.waitForCompletion(true) ? 0 : 1; 457 } 458}