1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.conf.Configured;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.hbase.Cell;
26 import org.apache.hadoop.hbase.CellUtil;
27 import org.apache.hadoop.hbase.HBaseConfiguration;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.KeyValueUtil;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.classification.InterfaceStability;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.Delete;
36 import org.apache.hadoop.hbase.client.Mutation;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.RegionLocator;
39 import org.apache.hadoop.hbase.client.Table;
40 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
41 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.hbase.wal.WALKey;
44 import org.apache.hadoop.mapreduce.Job;
45 import org.apache.hadoop.mapreduce.Mapper;
46 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
47 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
48 import org.apache.hadoop.util.GenericOptionsParser;
49 import org.apache.hadoop.util.Tool;
50 import org.apache.hadoop.util.ToolRunner;
51
52 import java.io.IOException;
53 import java.text.ParseException;
54 import java.text.SimpleDateFormat;
55 import java.util.Map;
56 import java.util.TreeMap;
57
58
59
60
61
62
63
64
65
66
67
68 @InterfaceAudience.Public
69 @InterfaceStability.Stable
70 public class WALPlayer extends Configured implements Tool {
71 final static Log LOG = LogFactory.getLog(WALPlayer.class);
72 final static String NAME = "WALPlayer";
73 final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
74 final static String TABLES_KEY = "wal.input.tables";
75 final static String TABLE_MAP_KEY = "wal.input.tablesmap";
76
77
78
79 static {
80 Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY);
81 Configuration.addDeprecation("hlog.input.tables", TABLES_KEY);
82 Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY);
83 Configuration.addDeprecation(HLogInputFormat.START_TIME_KEY, WALInputFormat.START_TIME_KEY);
84 Configuration.addDeprecation(HLogInputFormat.END_TIME_KEY, WALInputFormat.END_TIME_KEY);
85 }
86
87
88
89
90
91 static class WALKeyValueMapper
92 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
93 private byte[] table;
94
95 @Override
96 public void map(WALKey key, WALEdit value,
97 Context context)
98 throws IOException {
99 try {
100
101 if (Bytes.equals(table, key.getTablename().getName())) {
102 for (Cell cell : value.getCells()) {
103 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
104 if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
105 context.write(new ImmutableBytesWritable(kv.getRow()), kv);
106 }
107 }
108 } catch (InterruptedException e) {
109 e.printStackTrace();
110 }
111 }
112
113 @Override
114 public void setup(Context context) throws IOException {
115
116 String tables[] = context.getConfiguration().getStrings(TABLES_KEY);
117 if (tables == null || tables.length != 1) {
118
119 throw new IOException("Exactly one table must be specified for bulk HFile case.");
120 }
121 table = Bytes.toBytes(tables[0]);
122 }
123 }
124
125
126
127
128
129 protected static class WALMapper
130 extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
131 private Map<TableName, TableName> tables = new TreeMap<TableName, TableName>();
132
133 @Override
134 public void map(WALKey key, WALEdit value, Context context)
135 throws IOException {
136 try {
137 if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
138 TableName targetTable = tables.isEmpty() ?
139 key.getTablename() :
140 tables.get(key.getTablename());
141 ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName());
142 Put put = null;
143 Delete del = null;
144 Cell lastCell = null;
145 for (Cell cell : value.getCells()) {
146
147 if (WALEdit.isMetaEditFamily(cell.getFamily())) continue;
148
149
150 if (filter(context, cell)) {
151
152
153
154
155 if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
156 || !CellUtil.matchingRow(lastCell, cell)) {
157
158 if (put != null) context.write(tableOut, put);
159 if (del != null) context.write(tableOut, del);
160 if (CellUtil.isDelete(cell)) {
161 del = new Delete(cell.getRow());
162 } else {
163 put = new Put(cell.getRow());
164 }
165 }
166 if (CellUtil.isDelete(cell)) {
167 del.addDeleteMarker(cell);
168 } else {
169 put.add(cell);
170 }
171 }
172 lastCell = cell;
173 }
174
175 if (put != null) context.write(tableOut, put);
176 if (del != null) context.write(tableOut, del);
177 }
178 } catch (InterruptedException e) {
179 e.printStackTrace();
180 }
181 }
182
183
184
185
186
187 protected boolean filter(Context context, final Cell cell) {
188 return true;
189 }
190
191 @Override
192 public void setup(Context context) throws IOException {
193 String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
194 String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
195 if (tablesToUse == null && tableMap == null) {
196
197 } else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
198
199 throw new IOException("No tables or incorrect table mapping specified.");
200 }
201 int i = 0;
202 if (tablesToUse != null) {
203 for (String table : tablesToUse) {
204 tables.put(TableName.valueOf(table),
205 TableName.valueOf(tableMap[i++]));
206 }
207 }
208 }
209 }
210
211
212
213
214 public WALPlayer(Configuration conf) {
215 super(conf);
216 }
217
218 void setupTime(Configuration conf, String option) throws IOException {
219 String val = conf.get(option);
220 if (null == val) return;
221 long ms;
222 try {
223
224 ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
225 } catch (ParseException pe) {
226 try {
227
228 ms = Long.parseLong(val);
229 } catch (NumberFormatException nfe) {
230 throw new IOException(option
231 + " must be specified either in the form 2001-02-20T16:35:06.99 "
232 + "or as number of milliseconds");
233 }
234 }
235 conf.setLong(option, ms);
236 }
237
238
239
240
241
242
243
244
245 public Job createSubmittableJob(String[] args)
246 throws IOException {
247 Configuration conf = getConf();
248 setupTime(conf, HLogInputFormat.START_TIME_KEY);
249 setupTime(conf, HLogInputFormat.END_TIME_KEY);
250 Path inputDir = new Path(args[0]);
251 String[] tables = args[1].split(",");
252 String[] tableMap;
253 if (args.length > 2) {
254 tableMap = args[2].split(",");
255 if (tableMap.length != tables.length) {
256 throw new IOException("The same number of tables and mapping must be provided.");
257 }
258 } else {
259
260 tableMap = tables;
261 }
262 conf.setStrings(TABLES_KEY, tables);
263 conf.setStrings(TABLE_MAP_KEY, tableMap);
264 Job job = new Job(conf, NAME + "_" + inputDir);
265 job.setJarByClass(WALPlayer.class);
266 FileInputFormat.setInputPaths(job, inputDir);
267 job.setInputFormatClass(WALInputFormat.class);
268 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
269 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
270 if (hfileOutPath != null) {
271
272 if (tables.length != 1) {
273 throw new IOException("Exactly one table must be specified for the bulk export option");
274 }
275 TableName tableName = TableName.valueOf(tables[0]);
276 job.setMapperClass(WALKeyValueMapper.class);
277 job.setReducerClass(KeyValueSortReducer.class);
278 Path outputDir = new Path(hfileOutPath);
279 FileOutputFormat.setOutputPath(job, outputDir);
280 job.setMapOutputValueClass(KeyValue.class);
281 try (Connection conn = ConnectionFactory.createConnection(conf);
282 Table table = conn.getTable(tableName);
283 RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
284 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
285 }
286 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
287 com.google.common.base.Preconditions.class);
288 } else {
289
290 job.setMapperClass(WALMapper.class);
291 job.setOutputFormatClass(MultiTableOutputFormat.class);
292 TableMapReduceUtil.addDependencyJars(job);
293 TableMapReduceUtil.initCredentials(job);
294
295 job.setNumReduceTasks(0);
296 }
297 return job;
298 }
299
300
301
302
303 private void usage(final String errorMsg) {
304 if (errorMsg != null && errorMsg.length() > 0) {
305 System.err.println("ERROR: " + errorMsg);
306 }
307 System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
308 System.err.println("Read all WAL entries for <tables>.");
309 System.err.println("If no tables (\"\") are specific, all tables are imported.");
310 System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported in that case.)");
311 System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
312 System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
313 System.err.println("<tableMapping> is a command separated list of targettables.");
314 System.err.println("If specified, each table in <tables> must have a mapping.\n");
315 System.err.println("By default " + NAME + " will load data directly into HBase.");
316 System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
317 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
318 System.err.println(" (Only one table can be specified, and no mapping is allowed!)");
319 System.err.println("Other options: (specify time range to WAL edit to consider)");
320 System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
321 System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
322 System.err.println("For performance also consider the following options:\n"
323 + " -Dmapreduce.map.speculative=false\n"
324 + " -Dmapreduce.reduce.speculative=false");
325 }
326
327
328
329
330
331
332
333 public static void main(String[] args) throws Exception {
334 int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
335 System.exit(ret);
336 }
337
338 @Override
339 public int run(String[] args) throws Exception {
340 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
341 if (otherArgs.length < 2) {
342 usage("Wrong number of arguments: " + otherArgs.length);
343 System.exit(-1);
344 }
345 Job job = createSubmittableJob(otherArgs);
346 return job.waitForCompletion(true) ? 0 : 1;
347 }
348 }