View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.text.ParseException;
22  import java.text.SimpleDateFormat;
23  import java.util.Map;
24  import java.util.TreeMap;
25  
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.conf.Configured;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HBaseConfiguration;
34  import org.apache.hadoop.hbase.KeyValue;
35  import org.apache.hadoop.hbase.KeyValueUtil;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.client.Delete;
38  import org.apache.hadoop.hbase.client.HTable;
39  import org.apache.hadoop.hbase.client.Mutation;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
42  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
43  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.mapreduce.Job;
46  import org.apache.hadoop.mapreduce.Mapper;
47  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
48  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
49  import org.apache.hadoop.util.GenericOptionsParser;
50  import org.apache.hadoop.util.Tool;
51  import org.apache.hadoop.util.ToolRunner;
52  
53  /**
54   * A tool to replay WAL files as a M/R job.
55   * The WAL can be replayed for a set of tables or all tables,
56   * and a timerange can be provided (in milliseconds).
57   * The WAL is filtered to the passed set of tables and  the output
58   * can optionally be mapped to another set of tables.
59   *
60   * WAL replay can also generate HFiles for later bulk importing,
61   * in that case the WAL is replayed for a single table only.
62   */
63  @InterfaceAudience.Public
64  @InterfaceStability.Stable
65  public class WALPlayer extends Configured implements Tool {
66    final static String NAME = "WALPlayer";
67    final static String BULK_OUTPUT_CONF_KEY = "hlog.bulk.output";
68    final static String HLOG_INPUT_KEY = "hlog.input.dir";
69    final static String TABLES_KEY = "hlog.input.tables";
70    final static String TABLE_MAP_KEY = "hlog.input.tablesmap";
71  
72    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
73  
74    /**
75     * A mapper that just writes out KeyValues.
76     * This one can be used together with {@link KeyValueSortReducer}
77     */
78    static class HLogKeyValueMapper
79    extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue> {
80      private byte[] table;
81  
82      @Override
83      public void map(HLogKey key, WALEdit value,
84        Context context)
85      throws IOException {
86        try {
87          // skip all other tables
88          if (Bytes.equals(table, key.getTablename().getName())) {
89            for (Cell cell : value.getCells()) {
90              KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
91              if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
92              context.write(new ImmutableBytesWritable(kv.getRow()), kv);
93            }
94          }
95        } catch (InterruptedException e) {
96          e.printStackTrace();
97        }
98      }
99  
100     @Override
101     public void setup(Context context) throws IOException {
102       // only a single table is supported when HFiles are generated with HFileOutputFormat
103       String tables[] = context.getConfiguration().getStrings(TABLES_KEY);
104       if (tables == null || tables.length != 1) {
105         // this can only happen when HLogMapper is used directly by a class other than WALPlayer
106         throw new IOException("Exactly one table must be specified for bulk HFile case.");
107       }
108       table = Bytes.toBytes(tables[0]);
109     }
110   }
111 
112   /**
113    * A mapper that writes out {@link Mutation} to be directly applied to
114    * a running HBase instance.
115    */
116   static class HLogMapper
117   extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, Mutation> {
118     private Map<TableName, TableName> tables =
119         new TreeMap<TableName, TableName>();
120 
121     @Override
122     public void map(HLogKey key, WALEdit value,
123       Context context)
124     throws IOException {
125       try {
126         if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
127           TableName targetTable = tables.isEmpty() ?
128                 key.getTablename() :
129                 tables.get(key.getTablename());
130           ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName());
131           Put put = null;
132           Delete del = null;
133           Cell lastCell = null;
134           for (Cell cell : value.getCells()) {
135             // filtering HLog meta entries
136             if (WALEdit.isMetaEditFamily(cell.getFamily())) continue;
137 
138             // A WALEdit may contain multiple operations (HBASE-3584) and/or
139             // multiple rows (HBASE-5229).
140             // Aggregate as much as possible into a single Put/Delete
141             // operation before writing to the context.
142             if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
143                 || !CellUtil.matchingRow(lastCell, cell)) {
144               // row or type changed, write out aggregate KVs.
145               if (put != null) context.write(tableOut, put);
146               if (del != null) context.write(tableOut, del);
147 
148               if (CellUtil.isDelete(cell)) {
149                 del = new Delete(cell.getRow());
150               } else {
151                 put = new Put(cell.getRow());
152               }
153             }
154             if (CellUtil.isDelete(cell)) {
155               del.addDeleteMarker(cell);
156             } else {
157               put.add(cell);
158             }
159             lastCell = cell;
160           }
161           // write residual KVs
162           if (put != null) context.write(tableOut, put);
163           if (del != null) context.write(tableOut, del);
164         }
165       } catch (InterruptedException e) {
166         e.printStackTrace();
167       }
168     }
169 
170     @Override
171     public void setup(Context context) throws IOException {
172       String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
173       String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
174       if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
175         // this can only happen when HLogMapper is used directly by a class other than WALPlayer
176         throw new IOException("No tables or incorrect table mapping specified.");
177       }
178       int i = 0;
179       for (String table : tablesToUse) {
180         tables.put(TableName.valueOf(table),
181             TableName.valueOf(tableMap[i++]));
182       }
183     }
184   }
185 
186   /**
187    * @param conf The {@link Configuration} to use.
188    */
189   public WALPlayer(Configuration conf) {
190     super(conf);
191   }
192 
193   void setupTime(Configuration conf, String option) throws IOException {
194     String val = conf.get(option);
195     if (val == null) return;
196     long ms;
197     try {
198       // first try to parse in user friendly form
199       ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
200     } catch (ParseException pe) {
201       try {
202         // then see if just a number of ms's was specified
203         ms = Long.parseLong(val);
204       } catch (NumberFormatException nfe) {
205         throw new IOException(option
206             + " must be specified either in the form 2001-02-20T16:35:06.99 "
207             + "or as number of milliseconds");
208       }
209     }
210     conf.setLong(option, ms);
211   }
212 
213   /**
214    * Sets up the actual job.
215    *
216    * @param args  The command line parameters.
217    * @return The newly created job.
218    * @throws IOException When setting up the job fails.
219    */
220   public Job createSubmittableJob(String[] args)
221   throws IOException {
222     Configuration conf = getConf();
223     setupTime(conf, HLogInputFormat.START_TIME_KEY);
224     setupTime(conf, HLogInputFormat.END_TIME_KEY);
225     Path inputDir = new Path(args[0]);
226     String[] tables = args[1].split(",");
227     String[] tableMap;
228     if (args.length > 2) {
229       tableMap = args[2].split(",");
230       if (tableMap.length != tables.length) {
231         throw new IOException("The same number of tables and mapping must be provided.");
232       }
233     } else {
234       // if not mapping is specified map each table to itself
235       tableMap = tables;
236     }
237     conf.setStrings(TABLES_KEY, tables);
238     conf.setStrings(TABLE_MAP_KEY, tableMap);
239     Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + inputDir));
240     job.setJarByClass(WALPlayer.class);
241     FileInputFormat.setInputPaths(job, inputDir);
242     job.setInputFormatClass(HLogInputFormat.class);
243     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
244     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
245     if (hfileOutPath != null) {
246       // the bulk HFile case
247       if (tables.length != 1) {
248         throw new IOException("Exactly one table must be specified for the bulk export option");
249       }
250       HTable table = new HTable(conf, TableName.valueOf(tables[0]));
251       job.setMapperClass(HLogKeyValueMapper.class);
252       job.setReducerClass(KeyValueSortReducer.class);
253       Path outputDir = new Path(hfileOutPath);
254       FileOutputFormat.setOutputPath(job, outputDir);
255       job.setMapOutputValueClass(KeyValue.class);
256       HFileOutputFormat.configureIncrementalLoad(job, table);
257       TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
258           com.google.common.base.Preconditions.class);
259     } else {
260       // output to live cluster
261       job.setMapperClass(HLogMapper.class);
262       job.setOutputFormatClass(MultiTableOutputFormat.class);
263       TableMapReduceUtil.addDependencyJars(job);
264       TableMapReduceUtil.initCredentials(job);
265       // No reducers.
266       job.setNumReduceTasks(0);
267     }
268     return job;
269   }
270 
271   /*
272    * @param errorMsg Error message.  Can be null.
273    */
274   private void usage(final String errorMsg) {
275     if (errorMsg != null && errorMsg.length() > 0) {
276       System.err.println("ERROR: " + errorMsg);
277     }
278     System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
279     System.err.println("Read all WAL entries for <tables>.");
280     System.err.println("If no tables (\"\") are specific, all tables are imported.");
281     System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported in that case.)");
282     System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
283     System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
284     System.err.println("<tableMapping> is a command separated list of targettables.");
285     System.err.println("If specified, each table in <tables> must have a mapping.\n");
286     System.err.println("By default " + NAME + " will load data directly into HBase.");
287     System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
288     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
289     System.err.println("  (Only one table can be specified, and no mapping is allowed!)");
290     System.err.println("Other options: (specify time range to WAL edit to consider)");
291     System.err.println("  -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]");
292     System.err.println("  -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]");
293     System.err.println("   -D " + JOB_NAME_CONF_KEY
294         + "=jobName - use the specified mapreduce job name for the wal player");
295     System.err.println("For performance also consider the following options:\n"
296         + "  -Dmapreduce.map.speculative=false\n"
297         + "  -Dmapreduce.reduce.speculative=false");
298   }
299 
300   /**
301    * Main entry point.
302    *
303    * @param args  The command line parameters.
304    * @throws Exception When running the job fails.
305    */
306   public static void main(String[] args) throws Exception {
307     int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
308     System.exit(ret);
309   }
310 
311   @Override
312   public int run(String[] args) throws Exception {
313     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
314     if (otherArgs.length < 2) {
315       usage("Wrong number of arguments: " + otherArgs.length);
316       System.exit(-1);
317     }
318     Job job = createSubmittableJob(otherArgs);
319     return job.waitForCompletion(true) ? 0 : 1;
320   }
321 }