1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.text.ParseException;
22 import java.text.SimpleDateFormat;
23 import java.util.Map;
24 import java.util.TreeMap;
25
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.conf.Configured;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.client.Delete;
34 import org.apache.hadoop.hbase.client.HTable;
35 import org.apache.hadoop.hbase.client.Mutation;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
38 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
39 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.mapreduce.Job;
42 import org.apache.hadoop.mapreduce.Mapper;
43 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
44 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
45 import org.apache.hadoop.util.GenericOptionsParser;
46 import org.apache.hadoop.util.Tool;
47 import org.apache.hadoop.util.ToolRunner;
48
49
50
51
52
53
54
55
56
57
58
59 @InterfaceAudience.Public
60 @InterfaceStability.Stable
61 public class WALPlayer extends Configured implements Tool {
62 final static String NAME = "WALPlayer";
63 final static String BULK_OUTPUT_CONF_KEY = "hlog.bulk.output";
64 final static String HLOG_INPUT_KEY = "hlog.input.dir";
65 final static String TABLES_KEY = "hlog.input.tables";
66 final static String TABLE_MAP_KEY = "hlog.input.tablesmap";
67
68
69
70
71
72 static class HLogKeyValueMapper
73 extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue> {
74 private byte[] table;
75
76 @Override
77 public void map(HLogKey key, WALEdit value,
78 Context context)
79 throws IOException {
80 try {
81
82 if (Bytes.equals(table, key.getTablename())) {
83 for (KeyValue kv : value.getKeyValues()) {
84 if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
85 context.write(new ImmutableBytesWritable(kv.getRow()), kv);
86 }
87 }
88 } catch (InterruptedException e) {
89 e.printStackTrace();
90 }
91 }
92
93 @Override
94 public void setup(Context context) throws IOException {
95
96 String tables[] = context.getConfiguration().getStrings(TABLES_KEY);
97 if (tables == null || tables.length != 1) {
98
99 throw new IOException("Exactly one table must be specified for bulk HFile case.");
100 }
101 table = Bytes.toBytes(tables[0]);
102 }
103 }
104
105
106
107
108
109 static class HLogMapper
110 extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, Mutation> {
111 private Map<byte[], byte[]> tables = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
112
113 @Override
114 public void map(HLogKey key, WALEdit value,
115 Context context)
116 throws IOException {
117 try {
118 if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
119 byte[] targetTable = tables.isEmpty() ?
120 key.getTablename() :
121 tables.get(key.getTablename());
122 ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable);
123 Put put = null;
124 Delete del = null;
125 KeyValue lastKV = null;
126 for (KeyValue kv : value.getKeyValues()) {
127
128 if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
129
130
131
132
133
134 if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
135
136 if (put != null) context.write(tableOut, put);
137 if (del != null) context.write(tableOut, del);
138
139 if (kv.isDelete()) {
140 del = new Delete(kv.getRow());
141 } else {
142 put = new Put(kv.getRow());
143 }
144 }
145 if (kv.isDelete()) {
146 del.addDeleteMarker(kv);
147 } else {
148 put.add(kv);
149 }
150 lastKV = kv;
151 }
152
153 if (put != null) context.write(tableOut, put);
154 if (del != null) context.write(tableOut, del);
155 }
156 } catch (InterruptedException e) {
157 e.printStackTrace();
158 }
159 }
160
161 @Override
162 public void setup(Context context) throws IOException {
163 String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
164 String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
165 if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
166
167 throw new IOException("No tables or incorrect table mapping specified.");
168 }
169 int i = 0;
170 for (String table : tablesToUse) {
171 tables.put(Bytes.toBytes(table), Bytes.toBytes(tableMap[i++]));
172 }
173 }
174 }
175
176
177
178
179 public WALPlayer(Configuration conf) {
180 super(conf);
181 }
182
183 void setupTime(Configuration conf, String option) throws IOException {
184 String val = conf.get(option);
185 if (val == null) return;
186 long ms;
187 try {
188
189 ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
190 } catch (ParseException pe) {
191 try {
192
193 ms = Long.parseLong(val);
194 } catch (NumberFormatException nfe) {
195 throw new IOException(option
196 + " must be specified either in the form 2001-02-20T16:35:06.99 "
197 + "or as number of milliseconds");
198 }
199 }
200 conf.setLong(option, ms);
201 }
202
203
204
205
206
207
208
209
210 public Job createSubmittableJob(String[] args)
211 throws IOException {
212 Configuration conf = getConf();
213 setupTime(conf, HLogInputFormat.START_TIME_KEY);
214 setupTime(conf, HLogInputFormat.END_TIME_KEY);
215 Path inputDir = new Path(args[0]);
216 String[] tables = args[1].split(",");
217 String[] tableMap;
218 if (args.length > 2) {
219 tableMap = args[2].split(",");
220 if (tableMap.length != tables.length) {
221 throw new IOException("The same number of tables and mapping must be provided.");
222 }
223 } else {
224
225 tableMap = tables;
226 }
227 conf.setStrings(TABLES_KEY, tables);
228 conf.setStrings(TABLE_MAP_KEY, tableMap);
229 Job job = new Job(conf, NAME + "_" + inputDir);
230 job.setJarByClass(WALPlayer.class);
231 FileInputFormat.setInputPaths(job, inputDir);
232 job.setInputFormatClass(HLogInputFormat.class);
233 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
234 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
235 if (hfileOutPath != null) {
236
237 if (tables.length != 1) {
238 throw new IOException("Exactly one table must be specified for the bulk export option");
239 }
240 HTable table = new HTable(conf, tables[0]);
241 job.setMapperClass(HLogKeyValueMapper.class);
242 job.setReducerClass(KeyValueSortReducer.class);
243 Path outputDir = new Path(hfileOutPath);
244 FileOutputFormat.setOutputPath(job, outputDir);
245 job.setMapOutputValueClass(KeyValue.class);
246 HFileOutputFormat.configureIncrementalLoad(job, table);
247 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
248 com.google.common.base.Preconditions.class);
249 } else {
250
251 job.setMapperClass(HLogMapper.class);
252 job.setOutputFormatClass(MultiTableOutputFormat.class);
253 TableMapReduceUtil.addDependencyJars(job);
254
255 job.setNumReduceTasks(0);
256 }
257 return job;
258 }
259
260
261
262
263 private void usage(final String errorMsg) {
264 if (errorMsg != null && errorMsg.length() > 0) {
265 System.err.println("ERROR: " + errorMsg);
266 }
267 System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
268 System.err.println("Read all WAL entries for <tables>.");
269 System.err.println("If no tables (\"\") are specific, all tables are imported.");
270 System.err.println("(Careful, even -ROOT- and .META. entries will be imported in that case.)");
271 System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
272 System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
273 System.err.println("<tableMapping> is a command separated list of targettables.");
274 System.err.println("If specified, each table in <tables> must have a mapping.\n");
275 System.err.println("By default " + NAME + " will load data directly into HBase.");
276 System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
277 System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
278 System.err.println(" (Only one table can be specified, and no mapping is allowed!)");
279 System.err.println("Other options: (specify time range to WAL edit to consider)");
280 System.err.println(" -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]");
281 System.err.println(" -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]");
282 System.err.println("For performance also consider the following options:\n"
283 + " -Dmapred.map.tasks.speculative.execution=false\n"
284 + " -Dmapred.reduce.tasks.speculative.execution=false");
285 }
286
287
288
289
290
291
292
293 public static void main(String[] args) throws Exception {
294 int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
295 System.exit(ret);
296 }
297
298 @Override
299 public int run(String[] args) throws Exception {
300 String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
301 if (otherArgs.length < 2) {
302 usage("Wrong number of arguments: " + otherArgs.length);
303 System.exit(-1);
304 }
305 Job job = createSubmittableJob(otherArgs);
306 return job.waitForCompletion(true) ? 0 : 1;
307 }
308 }