View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.text.ParseException;
22  import java.text.SimpleDateFormat;
23  import java.util.Map;
24  import java.util.TreeMap;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.conf.Configured;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.KeyValue;
31  import org.apache.hadoop.hbase.client.Delete;
32  import org.apache.hadoop.hbase.client.HTable;
33  import org.apache.hadoop.hbase.client.Mutation;
34  import org.apache.hadoop.hbase.client.Put;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.hbase.regionserver.wal.HLog;
37  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
38  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.mapreduce.Job;
41  import org.apache.hadoop.mapreduce.Mapper;
42  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
43  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
44  import org.apache.hadoop.util.GenericOptionsParser;
45  import org.apache.hadoop.util.Tool;
46  import org.apache.hadoop.util.ToolRunner;
47  
48  /**
49   * A tool to replay WAL files as a M/R job.
50   * The WAL can be replayed for a set of tables or all tables,
51   * and a timerange can be provided (in milliseconds).
52   * The WAL is filtered to the passed set of tables and  the output
53   * can optionally be mapped to another set of tables.
54   *
55   * WAL replay can also generate HFiles for later bulk importing,
56   * in that case the WAL is replayed for a single table only.
57   */
58  public class WALPlayer extends Configured implements Tool {
59    final static String NAME = "WALPlayer";
60    final static String BULK_OUTPUT_CONF_KEY = "hlog.bulk.output";
61    final static String HLOG_INPUT_KEY = "hlog.input.dir";
62    final static String TABLES_KEY = "hlog.input.tables";
63    final static String TABLE_MAP_KEY = "hlog.input.tablesmap";
64  
65    /**
66     * A mapper that just writes out KeyValues.
67     * This one can be used together with {@link KeyValueSortReducer}
68     */
69    static class HLogKeyValueMapper
70    extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue> {
71      private byte[] table;
72  
73      @Override
74      public void map(HLogKey key, WALEdit value,
75        Context context)
76      throws IOException {
77        try {
78          // skip all other tables
79          if (Bytes.equals(table, key.getTablename())) {
80            for (KeyValue kv : value.getKeyValues()) {
81              if (HLog.isMetaFamily(kv.getFamily())) continue;
82              context.write(new ImmutableBytesWritable(kv.getRow()), kv);
83            }
84          }
85        } catch (InterruptedException e) {
86          e.printStackTrace();
87        }
88      }
89  
90      @Override
91      public void setup(Context context) throws IOException {
92        // only a single table is supported when HFiles are generated with HFileOutputFormat
93        String tables[] = context.getConfiguration().getStrings(TABLES_KEY);
94        if (tables == null || tables.length != 1) {
95          // this can only happen when HLogMapper is used directly by a class other than WALPlayer
96          throw new IOException("Exactly one table must be specified for bulk HFile case.");
97        }
98        table = Bytes.toBytes(tables[0]);
99      }
100   }
101 
102   /**
103    * A mapper that writes out {@link Mutation} to be directly applied to
104    * a running HBase instance.
105    */
106   static class HLogMapper
107   extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, Mutation> {
108     private Map<byte[], byte[]> tables = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
109 
110     @Override
111     public void map(HLogKey key, WALEdit value,
112       Context context)
113     throws IOException {
114       try {
115         if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
116           byte[] targetTable = tables.isEmpty() ?
117                 key.getTablename() :
118                 tables.get(key.getTablename());
119           ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable);
120           Put put = null;
121           Delete del = null;
122           KeyValue lastKV = null;
123           for (KeyValue kv : value.getKeyValues()) {
124             // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
125             if (HLog.isMetaFamily(kv.getFamily())) continue;
126 
127             // A WALEdit may contain multiple operations (HBASE-3584) and/or
128             // multiple rows (HBASE-5229).
129             // Aggregate as much as possible into a single Put/Delete
130             // operation before writing to the context.
131             if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
132               // row or type changed, write out aggregate KVs.
133               if (put != null) context.write(tableOut, put);
134               if (del != null) context.write(tableOut, del);
135 
136               if (kv.isDelete()) {
137                 del = new Delete(kv.getRow());
138               } else {
139                 put = new Put(kv.getRow());
140               }
141             }
142             if (kv.isDelete()) {
143               del.addDeleteMarker(kv);
144             } else {
145               put.add(kv);
146             }
147             lastKV = kv;
148           }
149           // write residual KVs
150           if (put != null) context.write(tableOut, put);
151           if (del != null) context.write(tableOut, del);
152         }
153       } catch (InterruptedException e) {
154         e.printStackTrace();
155       }
156     }
157 
158     @Override
159     public void setup(Context context) throws IOException {
160       String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
161       String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
162       if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
163         // this can only happen when HLogMapper is used directly by a class other than WALPlayer
164         throw new IOException("No tables or incorrect table mapping specified.");
165       }
166       int i = 0;
167       for (String table : tablesToUse) {
168         tables.put(Bytes.toBytes(table), Bytes.toBytes(tableMap[i++]));
169       }
170     }
171   }
172 
173   /**
174    * @param conf The {@link Configuration} to use.
175    */
176   public WALPlayer(Configuration conf) {
177     super(conf);
178   }
179 
180   void setupTime(Configuration conf, String option) throws IOException {
181     String val = conf.get(option);
182     if (val == null) return;
183     long ms;
184     try {
185       // first try to parse in user friendly form
186       ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
187     } catch (ParseException pe) {
188       try {
189         // then see if just a number of ms's was specified
190         ms = Long.parseLong(val);
191       } catch (NumberFormatException nfe) {
192         throw new IOException(option
193             + " must be specified either in the form 2001-02-20T16:35:06.99 "
194             + "or as number of milliseconds");
195       }
196     }
197     conf.setLong(option, ms);
198   }
199 
200   /**
201    * Sets up the actual job.
202    *
203    * @param args  The command line parameters.
204    * @return The newly created job.
205    * @throws IOException When setting up the job fails.
206    */
207   public Job createSubmittableJob(String[] args)
208   throws IOException {
209     Configuration conf = getConf();
210     setupTime(conf, HLogInputFormat.START_TIME_KEY);
211     setupTime(conf, HLogInputFormat.END_TIME_KEY);
212     Path inputDir = new Path(args[0]);
213     String[] tables = args[1].split(",");
214     String[] tableMap;
215     if (args.length > 2) {
216       tableMap = args[2].split(",");
217       if (tableMap.length != tables.length) {
218         throw new IOException("The same number of tables and mapping must be provided.");
219       }
220     } else {
221       // if not mapping is specified map each table to itself
222       tableMap = tables;
223     }
224     conf.setStrings(TABLES_KEY, tables);
225     conf.setStrings(TABLE_MAP_KEY, tableMap);
226     Job job = new Job(conf, NAME + "_" + inputDir);
227     job.setJarByClass(WALPlayer.class);
228     FileInputFormat.setInputPaths(job, inputDir);
229     job.setInputFormatClass(HLogInputFormat.class);
230     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
231     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
232     if (hfileOutPath != null) {
233       // the bulk HFile case
234       if (tables.length != 1) {
235         throw new IOException("Exactly one table must be specified for the bulk export option");
236       }
237       HTable table = new HTable(conf, tables[0]);
238       job.setMapperClass(HLogKeyValueMapper.class);
239       job.setReducerClass(KeyValueSortReducer.class);
240       Path outputDir = new Path(hfileOutPath);
241       FileOutputFormat.setOutputPath(job, outputDir);
242       job.setMapOutputValueClass(KeyValue.class);
243       HFileOutputFormat.configureIncrementalLoad(job, table);
244       TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
245           com.google.common.base.Preconditions.class);
246     } else {
247       // output to live cluster
248       job.setMapperClass(HLogMapper.class);
249       job.setOutputFormatClass(MultiTableOutputFormat.class);
250       TableMapReduceUtil.addDependencyJars(job);
251       // No reducers.
252       job.setNumReduceTasks(0);
253     }
254     return job;
255   }
256 
257   /*
258    * @param errorMsg Error message.  Can be null.
259    */
260   private void usage(final String errorMsg) {
261     if (errorMsg != null && errorMsg.length() > 0) {
262       System.err.println("ERROR: " + errorMsg);
263     }
264     System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
265     System.err.println("Read all WAL entries for <tables>.");
266     System.err.println("If no tables (\"\") are specific, all tables are imported.");
267     System.err.println("(Careful, even -ROOT- and .META. entries will be imported in that case.)");
268     System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
269     System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
270     System.err.println("<tableMapping> is a command separated list of targettables.");
271     System.err.println("If specified, each table in <tables> must have a mapping.\n");
272     System.err.println("By default " + NAME + " will load data directly into HBase.");
273     System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
274     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
275     System.err.println("  (Only one table can be specified, and no mapping is allowed!)");
276     System.err.println("Other options: (specify time range to WAL edit to consider)");
277     System.err.println("  -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]");
278     System.err.println("  -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]");
279     System.err.println("For performance also consider the following options:\n"
280         + "  -Dmapred.map.tasks.speculative.execution=false\n"
281         + "  -Dmapred.reduce.tasks.speculative.execution=false");
282   }
283 
284   /**
285    * Main entry point.
286    *
287    * @param args  The command line parameters.
288    * @throws Exception When running the job fails.
289    */
290   public static void main(String[] args) throws Exception {
291     int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
292     System.exit(ret);
293   }
294 
295   @Override
296   public int run(String[] args) throws Exception {
297     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
298     if (otherArgs.length < 2) {
299       usage("Wrong number of arguments: " + otherArgs.length);
300       System.exit(-1);
301     }
302     Job job = createSubmittableJob(otherArgs);
303     return job.waitForCompletion(true) ? 0 : 1;
304   }
305 }