1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.conf.Configured;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.hbase.Cell;
26 import org.apache.hadoop.hbase.CellUtil;
27 import org.apache.hadoop.hbase.HBaseConfiguration;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.KeyValueUtil;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.classification.InterfaceStability;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.Delete;
36 import org.apache.hadoop.hbase.client.Mutation;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.RegionLocator;
39 import org.apache.hadoop.hbase.client.Table;
40 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
41 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
42 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.wal.WALKey;
45 import org.apache.hadoop.mapreduce.Job;
46 import org.apache.hadoop.mapreduce.Mapper;
47 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
48 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
49 import org.apache.hadoop.util.GenericOptionsParser;
50 import org.apache.hadoop.util.Tool;
51 import org.apache.hadoop.util.ToolRunner;
52
53 import java.io.IOException;
54 import java.text.ParseException;
55 import java.text.SimpleDateFormat;
56 import java.util.Map;
57 import java.util.TreeMap;
58
59
60
61
62
63
64
65
66
67
68
69 @InterfaceAudience.Public
70 @InterfaceStability.Stable
71 public class WALPlayer extends Configured implements Tool {
72 private static final Log LOG = LogFactory.getLog(WALPlayer.class);
73 final static String NAME = "WALPlayer";
74 final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
75 final static String TABLES_KEY = "wal.input.tables";
76 final static String TABLE_MAP_KEY = "wal.input.tablesmap";
77
78
79
80 static {
81 Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY);
82 Configuration.addDeprecation("hlog.input.tables", TABLES_KEY);
83 Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY);
84 Configuration.addDeprecation(HLogInputFormat.START_TIME_KEY, WALInputFormat.START_TIME_KEY);
85 Configuration.addDeprecation(HLogInputFormat.END_TIME_KEY, WALInputFormat.END_TIME_KEY);
86 }
87
88
89
90
91
92 static class WALKeyValueMapper
93 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
94 private byte[] table;
95
96 @Override
97 public void map(WALKey key, WALEdit value,
98 Context context)
99 throws IOException {
100 try {
101
102 if (Bytes.equals(table, key.getTablename().getName())) {
103 for (Cell cell : value.getCells()) {
104 KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell);
105 if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
106 context.write(new ImmutableBytesWritable(kv.getRow()), kv);
107 }
108 }
109 } catch (InterruptedException e) {
110 e.printStackTrace();
111 }
112 }
113
114 @Override
115 public void setup(Context context) throws IOException {
116
117 String tables[] = context.getConfiguration().getStrings(TABLES_KEY);
118 if (tables == null || tables.length != 1) {
119
120 throw new IOException("Exactly one table must be specified for bulk HFile case.");
121 }
122 table = Bytes.toBytes(tables[0]);
123 }
124 }
125
126
127
128
129
130 protected static class WALMapper
131 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
132 private Map<TableName, TableName> tables = new TreeMap<TableName, TableName>();
133
134 @Override
135 public void map(WALKey key, WALEdit value, Context context)
136 throws IOException {
137 try {
138 if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
139 TableName targetTable = tables.isEmpty() ?
140 key.getTablename() :
141 tables.get(key.getTablename());
142 ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName());
143 Put put = null;
144 Delete del = null;
145 Cell lastCell = null;
146 for (Cell cell : value.getCells()) {
147
148 if (WALEdit.isMetaEditFamily(cell.getFamily())) continue;
149
150
151 if (filter(context, cell)) {
152
153
154
155
156 if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
157 || !CellUtil.matchingRow(lastCell, cell)) {
158
159 if (put != null) context.write(tableOut, put);
160 if (del != null) context.write(tableOut, del);
161 if (CellUtil.isDelete(cell)) {
162 del = new Delete(cell.getRow());
163 } else {
164 put = new Put(cell.getRow());
165 }
166 }
167 if (CellUtil.isDelete(cell)) {
168 del.addDeleteMarker(cell);
169 } else {
170 put.add(cell);
171 }
172 }
173 lastCell = cell;
174 }
175
176 if (put != null) context.write(tableOut, put);
177 if (del != null) context.write(tableOut, del);
178 }
179 } catch (InterruptedException e) {
180 e.printStackTrace();
181 }
182 }
183
184
185
186
187
188 protected boolean filter(Context context, final Cell cell) {
189 return true;
190 }
191
192 @Override
193 public void setup(Context context) throws IOException {
194 String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
195 String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
196 if (tablesToUse == null && tableMap == null) {
197
198 } else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
199
200 throw new IOException("No tables or incorrect table mapping specified.");
201 }
202 int i = 0;
203 if (tablesToUse != null) {
204 for (String table : tablesToUse) {
205 tables.put(TableName.valueOf(table),
206 TableName.valueOf(tableMap[i++]));
207 }
208 }
209 }
210 }
211
212
213
214
215 public WALPlayer(Configuration conf) {
216 super(conf);
217 }
218
219 void setupTime(Configuration conf, String option) throws IOException {
220 String val = conf.get(option);
221 if (null == val) return;
222 long ms;
223 try {
224
225 ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
226 } catch (ParseException pe) {
227 try {
228
229 ms = Long.parseLong(val);
230 } catch (NumberFormatException nfe) {
231 throw new IOException(option
232 + " must be specified either in the form 2001-02-20T16:35:06.99 "
233 + "or as number of milliseconds");
234 }
235 }
236 conf.setLong(option, ms);
237 }
238
239
240
241
242
243
244
245
246 public Job createSubmittableJob(String[] args)
247 throws IOException {
248 Configuration conf = getConf();
249 setupTime(conf, HLogInputFormat.START_TIME_KEY);
250 setupTime(conf, HLogInputFormat.END_TIME_KEY);
251 Path inputDir = new Path(args[0]);
252 String[] tables = args[1].split(",");
253 String[] tableMap;
254 if (args.length > 2) {
255 tableMap = args[2].split(",");
256 if (tableMap.length != tables.length) {
257 throw new IOException("The same number of tables and mapping must be provided.");
258 }
259 } else {
260
261 tableMap = tables;
262 }
263 conf.setStrings(TABLES_KEY, tables);
264 conf.setStrings(TABLE_MAP_KEY, tableMap);
265 Job job = new Job(conf, NAME + "_" + inputDir);
266 job.setJarByClass(WALPlayer.class);
267 FileInputFormat.setInputPaths(job, inputDir);
268 job.setInputFormatClass(WALInputFormat.class);
269 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
270 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
271 if (hfileOutPath != null) {
272
273 if (tables.length != 1) {
274 throw new IOException("Exactly one table must be specified for the bulk export option");
275 }
276 TableName tableName = TableName.valueOf(tables[0]);
277 job.setMapperClass(WALKeyValueMapper.class);
278 job.setReducerClass(KeyValueSortReducer.class);
279 Path outputDir = new Path(hfileOutPath);
280 FileOutputFormat.setOutputPath(job, outputDir);
281 job.setMapOutputValueClass(KeyValue.class);
282 try (Connection conn = ConnectionFactory.createConnection(conf);
283 Table table = conn.getTable(tableName);
284 RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
285 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
286 }
287 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
288 com.google.common.base.Preconditions.class);
289 } else {
290
291 job.setMapperClass(WALMapper.class);
292 job.setOutputFormatClass(MultiTableOutputFormat.class);
293 TableMapReduceUtil.addDependencyJars(job);
294 TableMapReduceUtil.initCredentials(job);
295
296 job.setNumReduceTasks(0);
297 }
298 String codecCls = WALCellCodec.getWALCellCodecClass(conf).getName();
299 try {
300 TableMapReduceUtil.addDependencyJars(conf, Class.forName(codecCls));
301 } catch (Exception e) {
302 throw new IOException("Cannot determine wal codec class " + codecCls, e);
303 }
304 return job;
305 }
306
307
308
309
310 private void usage(final String errorMsg) {
311 if (errorMsg != null && errorMsg.length() > 0) {
312 System.err.println("ERROR: " + errorMsg);
313 }
314 System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
315 System.err.println("Read all WAL entries for <tables>.");
316 System.err.println("If no tables (\"\") are specific, all tables are imported.");
317 System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported in that case.)");
318 System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
319 System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
320 System.err.println("<tableMapping> is a command separated list of targettables.");
321 System.err.println("If specified, each table in <tables> must have a mapping.\n");
322 System.err.println("By default " + NAME + " will load data directly into HBase.");
323 System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
324 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
325 System.err.println(" (Only one table can be specified, and no mapping is allowed!)");
326 System.err.println("Other options: (specify time range to WAL edit to consider)");
327 System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
328 System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
329 System.err.println("For performance also consider the following options:\n"
330 + " -Dmapreduce.map.speculative=false\n"
331 + " -Dmapreduce.reduce.speculative=false");
332 }
333
334
335
336
337
338
339
340 public static void main(String[] args) throws Exception {
341 int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
342 System.exit(ret);
343 }
344
345 @Override
346 public int run(String[] args) throws Exception {
347 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
348 if (otherArgs.length < 2) {
349 usage("Wrong number of arguments: " + otherArgs.length);
350 System.exit(-1);
351 }
352 Job job = createSubmittableJob(otherArgs);
353 return job.waitForCompletion(true) ? 0 : 1;
354 }
355 }