1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.text.ParseException;
22  import java.text.SimpleDateFormat;
23  import java.util.Map;
24  import java.util.TreeMap;
25  
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.classification.InterfaceStability;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.conf.Configured;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.HBaseConfiguration;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.client.Delete;
34  import org.apache.hadoop.hbase.client.HTable;
35  import org.apache.hadoop.hbase.client.Mutation;
36  import org.apache.hadoop.hbase.client.Put;
37  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
38  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
39  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.mapreduce.Job;
42  import org.apache.hadoop.mapreduce.Mapper;
43  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
44  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
45  import org.apache.hadoop.util.GenericOptionsParser;
46  import org.apache.hadoop.util.Tool;
47  import org.apache.hadoop.util.ToolRunner;
48  
49  /**
50   * A tool to replay WAL files as a M/R job.
51   * The WAL can be replayed for a set of tables or all tables,
52   * and a timerange can be provided (in milliseconds).
53   * The WAL is filtered to the passed set of tables and  the output
54   * can optionally be mapped to another set of tables.
55   *
56   * WAL replay can also generate HFiles for later bulk importing,
57   * in that case the WAL is replayed for a single table only.
58   */
59  @InterfaceAudience.Public
60  @InterfaceStability.Stable
61  public class WALPlayer extends Configured implements Tool {
62    final static String NAME = "WALPlayer";
63    final static String BULK_OUTPUT_CONF_KEY = "hlog.bulk.output";
64    final static String HLOG_INPUT_KEY = "hlog.input.dir";
65    final static String TABLES_KEY = "hlog.input.tables";
66    final static String TABLE_MAP_KEY = "hlog.input.tablesmap";
67  
68    /**
69     * A mapper that just writes out KeyValues.
70     * This one can be used together with {@link KeyValueSortReducer}
71     */
72    static class HLogKeyValueMapper
73    extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue> {
74      private byte[] table;
75  
76      @Override
77      public void map(HLogKey key, WALEdit value,
78        Context context)
79      throws IOException {
80        try {
81          // skip all other tables
82          if (Bytes.equals(table, key.getTablename())) {
83            for (KeyValue kv : value.getKeyValues()) {
84              if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
85              context.write(new ImmutableBytesWritable(kv.getRow()), kv);
86            }
87          }
88        } catch (InterruptedException e) {
89          e.printStackTrace();
90        }
91      }
92  
93      @Override
94      public void setup(Context context) throws IOException {
95        // only a single table is supported when HFiles are generated with HFileOutputFormat
96        String tables[] = context.getConfiguration().getStrings(TABLES_KEY);
97        if (tables == null || tables.length != 1) {
98          // this can only happen when HLogMapper is used directly by a class other than WALPlayer
99          throw new IOException("Exactly one table must be specified for bulk HFile case.");
100       }
101       table = Bytes.toBytes(tables[0]);
102     }
103   }
104 
105   /**
106    * A mapper that writes out {@link Mutation} to be directly applied to
107    * a running HBase instance.
108    */
109   static class HLogMapper
110   extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, Mutation> {
111     private Map<byte[], byte[]> tables = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
112 
113     @Override
114     public void map(HLogKey key, WALEdit value,
115       Context context)
116     throws IOException {
117       try {
118         if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
119           byte[] targetTable = tables.isEmpty() ?
120                 key.getTablename() :
121                 tables.get(key.getTablename());
122           ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable);
123           Put put = null;
124           Delete del = null;
125           KeyValue lastKV = null;
126           for (KeyValue kv : value.getKeyValues()) {
127             // filtering HLog meta entries
128             if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
129 
130             // A WALEdit may contain multiple operations (HBASE-3584) and/or
131             // multiple rows (HBASE-5229).
132             // Aggregate as much as possible into a single Put/Delete
133             // operation before writing to the context.
134             if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
135               // row or type changed, write out aggregate KVs.
136               if (put != null) context.write(tableOut, put);
137               if (del != null) context.write(tableOut, del);
138 
139               if (kv.isDelete()) {
140                 del = new Delete(kv.getRow());
141               } else {
142                 put = new Put(kv.getRow());
143               }
144             }
145             if (kv.isDelete()) {
146               del.addDeleteMarker(kv);
147             } else {
148               put.add(kv);
149             }
150             lastKV = kv;
151           }
152           // write residual KVs
153           if (put != null) context.write(tableOut, put);
154           if (del != null) context.write(tableOut, del);
155         }
156       } catch (InterruptedException e) {
157         e.printStackTrace();
158       }
159     }
160 
161     @Override
162     public void setup(Context context) throws IOException {
163       String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
164       String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
165       if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
166         // this can only happen when HLogMapper is used directly by a class other than WALPlayer
167         throw new IOException("No tables or incorrect table mapping specified.");
168       }
169       int i = 0;
170       for (String table : tablesToUse) {
171         tables.put(Bytes.toBytes(table), Bytes.toBytes(tableMap[i++]));
172       }
173     }
174   }
175 
176   /**
177    * @param conf The {@link Configuration} to use.
178    */
179   public WALPlayer(Configuration conf) {
180     super(conf);
181   }
182 
183   void setupTime(Configuration conf, String option) throws IOException {
184     String val = conf.get(option);
185     if (val == null) return;
186     long ms;
187     try {
188       // first try to parse in user friendly form
189       ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
190     } catch (ParseException pe) {
191       try {
192         // then see if just a number of ms's was specified
193         ms = Long.parseLong(val);
194       } catch (NumberFormatException nfe) {
195         throw new IOException(option
196             + " must be specified either in the form 2001-02-20T16:35:06.99 "
197             + "or as number of milliseconds");
198       }
199     }
200     conf.setLong(option, ms);
201   }
202 
203   /**
204    * Sets up the actual job.
205    *
206    * @param args  The command line parameters.
207    * @return The newly created job.
208    * @throws IOException When setting up the job fails.
209    */
210   public Job createSubmittableJob(String[] args)
211   throws IOException {
212     Configuration conf = getConf();
213     setupTime(conf, HLogInputFormat.START_TIME_KEY);
214     setupTime(conf, HLogInputFormat.END_TIME_KEY);
215     Path inputDir = new Path(args[0]);
216     String[] tables = args[1].split(",");
217     String[] tableMap;
218     if (args.length > 2) {
219       tableMap = args[2].split(",");
220       if (tableMap.length != tables.length) {
221         throw new IOException("The same number of tables and mapping must be provided.");
222       }
223     } else {
224       // if not mapping is specified map each table to itself
225       tableMap = tables;
226     }
227     conf.setStrings(TABLES_KEY, tables);
228     conf.setStrings(TABLE_MAP_KEY, tableMap);
229     Job job = new Job(conf, NAME + "_" + inputDir);
230     job.setJarByClass(WALPlayer.class);
231     FileInputFormat.setInputPaths(job, inputDir);
232     job.setInputFormatClass(HLogInputFormat.class);
233     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
234     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
235     if (hfileOutPath != null) {
236       // the bulk HFile case
237       if (tables.length != 1) {
238         throw new IOException("Exactly one table must be specified for the bulk export option");
239       }
240       HTable table = new HTable(conf, tables[0]);
241       job.setMapperClass(HLogKeyValueMapper.class);
242       job.setReducerClass(KeyValueSortReducer.class);
243       Path outputDir = new Path(hfileOutPath);
244       FileOutputFormat.setOutputPath(job, outputDir);
245       job.setMapOutputValueClass(KeyValue.class);
246       HFileOutputFormat.configureIncrementalLoad(job, table);
247       TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
248           com.google.common.base.Preconditions.class);
249     } else {
250       // output to live cluster
251       job.setMapperClass(HLogMapper.class);
252       job.setOutputFormatClass(MultiTableOutputFormat.class);
253       TableMapReduceUtil.addDependencyJars(job);
254       // No reducers.
255       job.setNumReduceTasks(0);
256     }
257     return job;
258   }
259 
260   /*
261    * @param errorMsg Error message.  Can be null.
262    */
263   private void usage(final String errorMsg) {
264     if (errorMsg != null && errorMsg.length() > 0) {
265       System.err.println("ERROR: " + errorMsg);
266     }
267     System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
268     System.err.println("Read all WAL entries for <tables>.");
269     System.err.println("If no tables (\"\") are specific, all tables are imported.");
270     System.err.println("(Careful, even -ROOT- and .META. entries will be imported in that case.)");
271     System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
272     System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
273     System.err.println("<tableMapping> is a command separated list of targettables.");
274     System.err.println("If specified, each table in <tables> must have a mapping.\n");
275     System.err.println("By default " + NAME + " will load data directly into HBase.");
276     System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
277     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
278     System.err.println("  (Only one table can be specified, and no mapping is allowed!)");
279     System.err.println("Other options: (specify time range to WAL edit to consider)");
280     System.err.println("  -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]");
281     System.err.println("  -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]");
282     System.err.println("For performance also consider the following options:\n"
283         + "  -Dmapred.map.tasks.speculative.execution=false\n"
284         + "  -Dmapred.reduce.tasks.speculative.execution=false");
285   }
286 
287   /**
288    * Main entry point.
289    *
290    * @param args  The command line parameters.
291    * @throws Exception When running the job fails.
292    */
293   public static void main(String[] args) throws Exception {
294     int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
295     System.exit(ret);
296   }
297 
298   @Override
299   public int run(String[] args) throws Exception {
300     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
301     if (otherArgs.length < 2) {
302       usage("Wrong number of arguments: " + otherArgs.length);
303       System.exit(-1);
304     }
305     Job job = createSubmittableJob(otherArgs);
306     return job.waitForCompletion(true) ? 0 : 1;
307   }
308 }