001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.text.ParseException; 022import java.text.SimpleDateFormat; 023import java.util.ArrayList; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import java.util.TreeMap; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.client.ConnectionFactory; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Mutation; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.RegionLocator; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 044import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.TableInfo; 045import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.MapReduceExtendedCell; 048import org.apache.hadoop.hbase.wal.WALEdit; 049import org.apache.hadoop.hbase.wal.WALKey; 050import org.apache.hadoop.mapreduce.Job; 051import org.apache.hadoop.mapreduce.Mapper; 052import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 053import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 054import org.apache.hadoop.util.Tool; 055import org.apache.hadoop.util.ToolRunner; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060 061 062/** 063 * A tool to replay WAL files as a M/R job. 064 * The WAL can be replayed for a set of tables or all tables, 065 * and a time range can be provided (in milliseconds). 066 * The WAL is filtered to the passed set of tables and the output 067 * can optionally be mapped to another set of tables. 068 * 069 * WAL replay can also generate HFiles for later bulk importing, 070 * in that case the WAL is replayed for a single table only. 071 */ 072@InterfaceAudience.Public 073public class WALPlayer extends Configured implements Tool { 074 private static final Logger LOG = LoggerFactory.getLogger(WALPlayer.class); 075 final static String NAME = "WALPlayer"; 076 public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; 077 public final static String TABLES_KEY = "wal.input.tables"; 078 public final static String TABLE_MAP_KEY = "wal.input.tablesmap"; 079 public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator"; 080 public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files"; 081 public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support"; 082 083 protected static final String tableSeparator = ";"; 084 085 // This relies on Hadoop Configuration to handle warning about deprecated configs and 086 // to set the correct non-deprecated configs when an old one shows up. 087 static { 088 Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY); 089 Configuration.addDeprecation("hlog.input.tables", TABLES_KEY); 090 Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY); 091 } 092 093 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 094 095 public WALPlayer() { 096 } 097 098 protected WALPlayer(final Configuration c) { 099 super(c); 100 } 101 102 /** 103 * A mapper that just writes out KeyValues. This one can be used together with 104 * {@link CellSortReducer} 105 */ 106 static class WALKeyValueMapper extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell> { 107 private Set<String> tableSet = new HashSet<String>(); 108 private boolean multiTableSupport = false; 109 110 @Override 111 public void map(WALKey key, WALEdit value, Context context) throws IOException { 112 try { 113 // skip all other tables 114 TableName table = key.getTableName(); 115 if (tableSet.contains(table.getNameAsString())) { 116 for (Cell cell : value.getCells()) { 117 if (WALEdit.isMetaEditFamily(cell)) { 118 continue; 119 } 120 byte[] outKey = multiTableSupport 121 ? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell)) 122 : CellUtil.cloneRow(cell); 123 context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell)); 124 } 125 } 126 } catch (InterruptedException e) { 127 LOG.error("Interrupted while emitting Cell", e); 128 Thread.currentThread().interrupt(); 129 } 130 } 131 132 @Override 133 public void setup(Context context) throws IOException { 134 Configuration conf = context.getConfiguration(); 135 String[] tables = conf.getStrings(TABLES_KEY); 136 this.multiTableSupport = conf.getBoolean(MULTI_TABLES_SUPPORT, false); 137 for (String table : tables) { 138 tableSet.add(table); 139 } 140 } 141 } 142 143 /** 144 * Enum for map metrics. Keep it out here rather than inside in the Map 145 * inner-class so we can find associated properties. 146 */ 147 protected static enum Counter { 148 /** Number of aggregated writes */ 149 PUTS, 150 /** Number of aggregated deletes */ 151 DELETES, 152 CELLS_READ, 153 CELLS_WRITTEN, 154 WALEDITS 155 } 156 157 /** 158 * A mapper that writes out {@link Mutation} to be directly applied to 159 * a running HBase instance. 160 */ 161 protected static class WALMapper 162 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> { 163 private Map<TableName, TableName> tables = new TreeMap<>(); 164 165 @Override 166 public void map(WALKey key, WALEdit value, Context context) throws IOException { 167 context.getCounter(Counter.WALEDITS).increment(1); 168 try { 169 if (tables.isEmpty() || tables.containsKey(key.getTableName())) { 170 TableName targetTable = 171 tables.isEmpty() ? key.getTableName() : tables.get(key.getTableName()); 172 ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName()); 173 Put put = null; 174 Delete del = null; 175 Cell lastCell = null; 176 for (Cell cell : value.getCells()) { 177 context.getCounter(Counter.CELLS_READ).increment(1); 178 // Filtering WAL meta marker entries. 179 if (WALEdit.isMetaEditFamily(cell)) { 180 continue; 181 } 182 // Allow a subclass filter out this cell. 183 if (filter(context, cell)) { 184 // A WALEdit may contain multiple operations (HBASE-3584) and/or 185 // multiple rows (HBASE-5229). 186 // Aggregate as much as possible into a single Put/Delete 187 // operation before writing to the context. 188 if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte() 189 || !CellUtil.matchingRows(lastCell, cell)) { 190 // row or type changed, write out aggregate KVs. 191 if (put != null) { 192 context.write(tableOut, put); 193 context.getCounter(Counter.PUTS).increment(1); 194 } 195 if (del != null) { 196 context.write(tableOut, del); 197 context.getCounter(Counter.DELETES).increment(1); 198 } 199 if (CellUtil.isDelete(cell)) { 200 del = new Delete(CellUtil.cloneRow(cell)); 201 } else { 202 put = new Put(CellUtil.cloneRow(cell)); 203 } 204 } 205 if (CellUtil.isDelete(cell)) { 206 del.add(cell); 207 } else { 208 put.add(cell); 209 } 210 context.getCounter(Counter.CELLS_WRITTEN).increment(1); 211 } 212 lastCell = cell; 213 } 214 // write residual KVs 215 if (put != null) { 216 context.write(tableOut, put); 217 context.getCounter(Counter.PUTS).increment(1); 218 } 219 if (del != null) { 220 context.getCounter(Counter.DELETES).increment(1); 221 context.write(tableOut, del); 222 } 223 } 224 } catch (InterruptedException e) { 225 LOG.error("Interrupted while writing results", e); 226 Thread.currentThread().interrupt(); 227 } 228 } 229 230 protected boolean filter(Context context, final Cell cell) { 231 return true; 232 } 233 234 @Override 235 protected void 236 cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) 237 throws IOException, InterruptedException { 238 super.cleanup(context); 239 } 240 241 @SuppressWarnings("checkstyle:EmptyBlock") 242 @Override 243 public void setup(Context context) throws IOException { 244 String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY); 245 String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY); 246 if (tableMap == null) { 247 tableMap = tablesToUse; 248 } 249 if (tablesToUse == null) { 250 // Then user wants all tables. 251 } else if (tablesToUse.length != tableMap.length) { 252 // this can only happen when WALMapper is used directly by a class other than WALPlayer 253 throw new IOException("Incorrect table mapping specified ."); 254 } 255 int i = 0; 256 if (tablesToUse != null) { 257 for (String table : tablesToUse) { 258 tables.put(TableName.valueOf(table), TableName.valueOf(tableMap[i++])); 259 } 260 } 261 } 262 } 263 264 void setupTime(Configuration conf, String option) throws IOException { 265 String val = conf.get(option); 266 if (null == val) { 267 return; 268 } 269 long ms; 270 try { 271 // first try to parse in user friendly form 272 ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime(); 273 } catch (ParseException pe) { 274 try { 275 // then see if just a number of ms's was specified 276 ms = Long.parseLong(val); 277 } catch (NumberFormatException nfe) { 278 throw new IOException( 279 option + " must be specified either in the form 2001-02-20T16:35:06.99 " 280 + "or as number of milliseconds"); 281 } 282 } 283 conf.setLong(option, ms); 284 } 285 286 /** 287 * Sets up the actual job. 288 * @param args The command line parameters. 289 * @return The newly created job. 290 * @throws IOException When setting up the job fails. 291 */ 292 public Job createSubmittableJob(String[] args) throws IOException { 293 Configuration conf = getConf(); 294 setupTime(conf, WALInputFormat.START_TIME_KEY); 295 setupTime(conf, WALInputFormat.END_TIME_KEY); 296 String inputDirs = args[0]; 297 String[] tables = args.length == 1? new String [] {}: args[1].split(","); 298 String[] tableMap; 299 if (args.length > 2) { 300 tableMap = args[2].split(","); 301 if (tableMap.length != tables.length) { 302 throw new IOException("The same number of tables and mapping must be provided."); 303 } 304 } else { 305 // if no mapping is specified, map each table to itself 306 tableMap = tables; 307 } 308 conf.setStrings(TABLES_KEY, tables); 309 conf.setStrings(TABLE_MAP_KEY, tableMap); 310 conf.set(FileInputFormat.INPUT_DIR, inputDirs); 311 Job job = 312 Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); 313 job.setJarByClass(WALPlayer.class); 314 315 job.setInputFormatClass(WALInputFormat.class); 316 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 317 318 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); 319 if (hfileOutPath != null) { 320 LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); 321 322 // the bulk HFile case 323 List<TableName> tableNames = getTableNameList(tables); 324 325 job.setMapperClass(WALKeyValueMapper.class); 326 job.setReducerClass(CellSortReducer.class); 327 Path outputDir = new Path(hfileOutPath); 328 FileOutputFormat.setOutputPath(job, outputDir); 329 job.setMapOutputValueClass(MapReduceExtendedCell.class); 330 try (Connection conn = ConnectionFactory.createConnection(conf);) { 331 List<TableInfo> tableInfoList = new ArrayList<TableInfo>(); 332 for (TableName tableName : tableNames) { 333 Table table = conn.getTable(tableName); 334 RegionLocator regionLocator = conn.getRegionLocator(tableName); 335 tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator)); 336 } 337 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList); 338 } 339 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 340 org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); 341 } else { 342 // output to live cluster 343 job.setMapperClass(WALMapper.class); 344 job.setOutputFormatClass(MultiTableOutputFormat.class); 345 TableMapReduceUtil.addDependencyJars(job); 346 TableMapReduceUtil.initCredentials(job); 347 // No reducers. 348 job.setNumReduceTasks(0); 349 } 350 String codecCls = WALCellCodec.getWALCellCodecClass(conf).getName(); 351 try { 352 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 353 Class.forName(codecCls)); 354 } catch (Exception e) { 355 throw new IOException("Cannot determine wal codec class " + codecCls, e); 356 } 357 return job; 358 } 359 360 private List<TableName> getTableNameList(String[] tables) { 361 List<TableName> list = new ArrayList<TableName>(); 362 for (String name : tables) { 363 list.add(TableName.valueOf(name)); 364 } 365 return list; 366 } 367 368 /** 369 * Print usage 370 * @param errorMsg Error message. Can be null. 371 */ 372 private void usage(final String errorMsg) { 373 if (errorMsg != null && errorMsg.length() > 0) { 374 System.err.println("ERROR: " + errorMsg); 375 } 376 System.err.println("Usage: " + NAME + " [options] <WAL inputdir> [<tables> <tableMappings>]"); 377 System.err.println(" <WAL inputdir> directory of WALs to replay."); 378 System.err.println(" <tables> comma separated list of tables. If no tables specified,"); 379 System.err.println(" all are imported (even hbase:meta if present)."); 380 System.err.println(" <tableMappings> WAL entries can be mapped to a new set of tables by " + 381 "passing"); 382 System.err.println(" <tableMappings>, a comma separated list of target " + 383 "tables."); 384 System.err.println(" If specified, each table in <tables> must have a " + 385 "mapping."); 386 System.err.println("To generate HFiles to bulk load instead of loading HBase directly, pass:"); 387 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); 388 System.err.println(" Only one table can be specified, and no mapping allowed!"); 389 System.err.println("To specify a time range, pass:"); 390 System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]"); 391 System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]"); 392 System.err.println(" The start and the end date of timerange (inclusive). The dates can be"); 393 System.err.println(" expressed in milliseconds-since-epoch or yyyy-MM-dd'T'HH:mm:ss.SS " + 394 "format."); 395 System.err.println(" E.g. 1234567890120 or 2009-02-13T23:32:30.12"); 396 System.err.println("Other options:"); 397 System.err.println(" -D" + JOB_NAME_CONF_KEY + "=jobName"); 398 System.err.println(" Use the specified mapreduce job name for the wal player"); 399 System.err.println(" -Dwal.input.separator=' '"); 400 System.err.println(" Change WAL filename separator (WAL dir names use default ','.)"); 401 System.err.println("For performance also consider the following options:\n" 402 + " -Dmapreduce.map.speculative=false\n" 403 + " -Dmapreduce.reduce.speculative=false"); 404 } 405 406 /** 407 * Main entry point. 408 * @param args The command line parameters. 409 * @throws Exception When running the job fails. 410 */ 411 public static void main(String[] args) throws Exception { 412 int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args); 413 System.exit(ret); 414 } 415 416 @Override 417 public int run(String[] args) throws Exception { 418 if (args.length < 1) { 419 usage("Wrong number of arguments: " + args.length); 420 System.exit(-1); 421 } 422 Job job = createSubmittableJob(args); 423 return job.waitForCompletion(true) ? 0 : 1; 424 } 425}