View Javadoc

1   /**
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.IOException;
23  import java.lang.reflect.InvocationTargetException;
24  import java.lang.reflect.Method;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.UUID;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HBaseConfiguration;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
38  import org.apache.hadoop.hbase.client.Delete;
39  import org.apache.hadoop.hbase.client.HConnection;
40  import org.apache.hadoop.hbase.client.HConnectionManager;
41  import org.apache.hadoop.hbase.client.HTable;
42  import org.apache.hadoop.hbase.client.Mutation;
43  import org.apache.hadoop.hbase.client.Put;
44  import org.apache.hadoop.hbase.client.Result;
45  import org.apache.hadoop.hbase.filter.Filter;
46  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
47  import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
50  import org.apache.hadoop.mapreduce.Job;
51  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
52  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
53  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
54  import org.apache.hadoop.util.GenericOptionsParser;
55  import org.apache.zookeeper.KeeperException;
56  
57  /**
58   * Import data written by {@link Export}.
59   */
60  public class Import {
61    private static final Log LOG = LogFactory.getLog(Import.class);
62    final static String NAME = "import";
63    final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
64    final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
65    final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
66    final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
67  
68    // Optional filter to use for mappers
69    private static Filter filter;
70  
71    /**
72     * A mapper that just writes out KeyValues.
73     */
74    static class KeyValueImporter
75    extends TableMapper<ImmutableBytesWritable, KeyValue> {
76      private Map<byte[], byte[]> cfRenameMap;
77        
78      /**
79       * @param row  The current table row key.
80       * @param value  The columns.
81       * @param context  The current context.
82       * @throws IOException When something is broken with the data.
83       * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
84       *   org.apache.hadoop.mapreduce.Mapper.Context)
85       */
86      @Override
87      public void map(ImmutableBytesWritable row, Result value,
88        Context context)
89      throws IOException {
90        try {
91          if (filter == null || !filter.filterRowKey(row.get(), row.getOffset(), row.getLength())) {
92            for (KeyValue kv : value.raw()) {
93              kv = filterKv(kv);
94              // skip if we filtered it out
95              if (kv == null) continue;
96    
97              context.write(row, convertKv(kv, cfRenameMap));
98            }
99          }
100       } catch (InterruptedException e) {
101         e.printStackTrace();
102       }
103     }
104 
105     @Override
106     public void setup(Context context) {
107       cfRenameMap = createCfRenameMap(context.getConfiguration());
108       filter = instantiateFilter(context.getConfiguration());
109     }
110   }
111 
112   /**
113    * Write table content out to files in hdfs.
114    */
115   static class Importer
116   extends TableMapper<ImmutableBytesWritable, Mutation> {
117     private Map<byte[], byte[]> cfRenameMap;
118     private UUID clusterId;
119       
120     /**
121      * @param row  The current table row key.
122      * @param value  The columns.
123      * @param context  The current context.
124      * @throws IOException When something is broken with the data.
125      * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
126      *   org.apache.hadoop.mapreduce.Mapper.Context)
127      */
128     @Override
129     public void map(ImmutableBytesWritable row, Result value,
130       Context context)
131     throws IOException {
132       try {
133         writeResult(row, value, context);
134       } catch (InterruptedException e) {
135         e.printStackTrace();
136       }
137     }
138 
139     private void writeResult(ImmutableBytesWritable key, Result result, Context context)
140     throws IOException, InterruptedException {
141       Put put = null;
142       Delete delete = null;
143       if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
144         for (KeyValue kv : result.raw()) {
145           kv = filterKv(kv);
146           // skip if we filter it out
147           if (kv == null) continue;
148   
149           kv = convertKv(kv, cfRenameMap);
150           // Deletes and Puts are gathered and written when finished
151           /*
152            * If there are sequence of mutations and tombstones in an Export, and after Import the same
153            * sequence should be restored as it is. If we combine all Delete tombstones into single
154            * request then there is chance of ignoring few DeleteFamily tombstones, because if we
155            * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
156            * only newest in hbase table and ignoring other. Check - HBASE-12065
157            */
158           if (kv.isDeleteFamily()) {
159             Delete deleteFamily = new Delete(key.get());
160             deleteFamily.addDeleteMarker(kv);
161             deleteFamily.setClusterId(clusterId);
162             context.write(key, deleteFamily);
163           } else if (kv.isDelete()) {
164             if (delete == null) {
165               delete = new Delete(key.get());
166             }
167             delete.addDeleteMarker(kv);
168           } else {
169             if (put == null) { 
170               put = new Put(key.get());
171             }
172             put.add(kv);
173           }
174         }
175         if (put != null) {
176           put.setClusterId(clusterId);
177           context.write(key, put);
178         }
179         if (delete != null) {
180           delete.setClusterId(clusterId);
181           context.write(key, delete);
182         }
183       }
184     }
185 
186     @Override
187     public void setup(Context context) {
188       Configuration conf = context.getConfiguration();
189       cfRenameMap = createCfRenameMap(conf);
190       filter = instantiateFilter(conf);
191 
192       try {
193         HConnection connection = HConnectionManager.getConnection(conf);
194         ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
195         ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw);
196         clusterId = zkHelper.getUUIDForCluster(zkw);
197       } catch (ZooKeeperConnectionException e) {
198         LOG.error("Problem connecting to ZooKeper during task setup", e);
199       } catch (KeeperException e) {
200         LOG.error("Problem reading ZooKeeper data during task setup", e);
201       } catch (IOException e) {
202         LOG.error("Problem setting up task", e);
203       }
204 
205     }
206   }
207 
208   /**
209    * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
210    * optionally not include in the job output
211    * @param conf {@link Configuration} from which to load the filter
212    * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
213    * @throws IllegalArgumentException if the filter is misconfigured
214    */
215   private static Filter instantiateFilter(Configuration conf) {
216     // get the filter, if it was configured
217     Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
218     if (filterClass == null) {
219       LOG.debug("No configured filter class, accepting all keyvalues.");
220       return null;
221     }
222     LOG.debug("Attempting to create filter:" + filterClass);
223 
224     try {
225       Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
226       return (Filter) m.invoke(null, getFilterArgs(conf));
227     } catch (IllegalAccessException e) {
228       LOG.error("Couldn't instantiate filter!", e);
229       throw new RuntimeException(e);
230     } catch (SecurityException e) {
231       LOG.error("Couldn't instantiate filter!", e);
232       throw new RuntimeException(e);
233     } catch (NoSuchMethodException e) {
234       LOG.error("Couldn't instantiate filter!", e);
235       throw new RuntimeException(e);
236     } catch (IllegalArgumentException e) {
237       LOG.error("Couldn't instantiate filter!", e);
238       throw new RuntimeException(e);
239     } catch (InvocationTargetException e) {
240       LOG.error("Couldn't instantiate filter!", e);
241       throw new RuntimeException(e);
242     }
243   }
244 
245   private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
246     ArrayList<byte[]> args = new ArrayList<byte[]>();
247     String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
248     for (String arg : sargs) {
249       // all the filters' instantiation methods expected quoted args since they are coming from
250       // the shell, so add them here, though its shouldn't really be needed :-/
251       args.add(Bytes.toBytes("'" + arg + "'"));
252     }
253     return args;
254   }
255 
256   /**
257    * Attempt to filter out the keyvalue
258    * @param kv {@link KeyValue} on which to apply the filter
259    * @return <tt>null</tt> if the key should not be written, otherwise returns the original
260    *         {@link KeyValue}
261    */
262   private static KeyValue filterKv(KeyValue kv) {
263     // apply the filter and skip this kv if the filter doesn't apply
264     if (filter != null) {
265       Filter.ReturnCode code = filter.filterKeyValue(kv);
266       if (LOG.isTraceEnabled()) {
267         LOG.trace("Filter returned:" + code + " for the key value:" + kv);
268       }
269       // if its not an accept type, then skip this kv
270       if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
271           .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
272         return null;
273       }
274     }
275     return kv;
276   }
277 
278   // helper: create a new KeyValue based on CF rename map
279   private static KeyValue convertKv(KeyValue kv, Map<byte[], byte[]> cfRenameMap) {
280     if(cfRenameMap != null) {
281       // If there's a rename mapping for this CF, create a new KeyValue
282       byte[] newCfName = cfRenameMap.get(kv.getFamily());
283       if(newCfName != null) {
284           kv = new KeyValue(kv.getBuffer(), // row buffer 
285                   kv.getRowOffset(),        // row offset
286                   kv.getRowLength(),        // row length
287                   newCfName,                // CF buffer
288                   0,                        // CF offset 
289                   newCfName.length,         // CF length 
290                   kv.getBuffer(),           // qualifier buffer
291                   kv.getQualifierOffset(),  // qualifier offset
292                   kv.getQualifierLength(),  // qualifier length
293                   kv.getTimestamp(),        // timestamp
294                   KeyValue.Type.codeToType(kv.getType()), // KV Type
295                   kv.getBuffer(),           // value buffer 
296                   kv.getValueOffset(),      // value offset
297                   kv.getValueLength());     // value length
298       }
299     }
300     return kv;
301   }
302 
303   // helper: make a map from sourceCfName to destCfName by parsing a config key
304   private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
305     Map<byte[], byte[]> cfRenameMap = null;
306     String allMappingsPropVal = conf.get(CF_RENAME_PROP);
307     if(allMappingsPropVal != null) {
308       // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
309       String[] allMappings = allMappingsPropVal.split(",");
310       for (String mapping: allMappings) {
311         if(cfRenameMap == null) {
312             cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
313         }
314         String [] srcAndDest = mapping.split(":");
315         if(srcAndDest.length != 2) {
316             continue;
317         }
318         cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
319       }
320     }
321     return cfRenameMap;
322   }
323 
324   /**
325    * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
326    * the mapper how to rename column families.
327    * 
328    * <p>Alternately, instead of calling this function, you could set the configuration key 
329    * {@link #CF_RENAME_PROP} yourself. The value should look like 
330    * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
331    * the mapper behavior.
332    * 
333    * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
334    *  set
335    * @param renameMap a mapping from source CF names to destination CF names
336    */
337   static public void configureCfRenaming(Configuration conf, 
338           Map<String, String> renameMap) {
339     StringBuilder sb = new StringBuilder();
340     for(Map.Entry<String,String> entry: renameMap.entrySet()) {
341       String sourceCf = entry.getKey();
342       String destCf = entry.getValue();
343 
344       if(sourceCf.contains(":") || sourceCf.contains(",") || 
345               destCf.contains(":") || destCf.contains(",")) {
346         throw new IllegalArgumentException("Illegal character in CF names: " 
347               + sourceCf + ", " + destCf);
348       }
349 
350       if(sb.length() != 0) {
351         sb.append(",");
352       }
353       sb.append(sourceCf + ":" + destCf);
354     }
355     conf.set(CF_RENAME_PROP, sb.toString());
356   }
357   
358   /**
359    * Add a Filter to be instantiated on import
360    * @param conf Configuration to update (will be passed to the job)
361    * @param clazz {@link Filter} subclass to instantiate on the server.
362    * @param args List of arguments to pass to the filter on instantiation
363    */
364   public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
365       List<String> args) {
366     conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
367 
368     // build the param string for the key
369     StringBuilder builder = new StringBuilder();
370     for (int i = 0; i < args.size(); i++) {
371       String arg = args.get(i);
372       builder.append(arg);
373       if (i != args.size() - 1) {
374         builder.append(",");
375       }
376     }
377     conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
378   }
379 
380   /**
381    * Sets up the actual job.
382    *
383    * @param conf  The current configuration.
384    * @param args  The command line parameters.
385    * @return The newly created job.
386    * @throws IOException When setting up the job fails.
387    */
388   public static Job createSubmittableJob(Configuration conf, String[] args)
389   throws IOException {
390     String tableName = args[0];
391     Path inputDir = new Path(args[1]);
392     Job job = new Job(conf, NAME + "_" + tableName);
393     job.setJarByClass(Importer.class);
394     FileInputFormat.setInputPaths(job, inputDir);
395     job.setInputFormatClass(SequenceFileInputFormat.class);
396     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
397 
398     // make sure we get the filter in the jars
399     try {
400       Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
401       if (filter != null) {
402         TableMapReduceUtil.addDependencyJars(conf, filter);
403       }
404     } catch (Exception e) {
405       throw new IOException(e);
406     }
407 
408     if (hfileOutPath != null) {
409       job.setMapperClass(KeyValueImporter.class);
410       HTable table = new HTable(conf, tableName);
411       job.setReducerClass(KeyValueSortReducer.class);
412       Path outputDir = new Path(hfileOutPath);
413       FileOutputFormat.setOutputPath(job, outputDir);
414       job.setMapOutputKeyClass(ImmutableBytesWritable.class);
415       job.setMapOutputValueClass(KeyValue.class);
416       HFileOutputFormat.configureIncrementalLoad(job, table);
417       TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
418           com.google.common.base.Preconditions.class);
419     } else {
420       // No reducers.  Just write straight to table.  Call initTableReducerJob
421       // because it sets up the TableOutputFormat.
422       job.setMapperClass(Importer.class);
423       TableMapReduceUtil.initTableReducerJob(tableName, null, job);
424       job.setNumReduceTasks(0);
425     }
426     return job;
427   }
428 
429   /*
430    * @param errorMsg Error message.  Can be null.
431    */
432   private static void usage(final String errorMsg) {
433     if (errorMsg != null && errorMsg.length() > 0) {
434       System.err.println("ERROR: " + errorMsg);
435     }
436     System.err.println("Usage: Import [options] <tablename> <inputdir>");
437     System.err.println("By default Import will load data directly into HBase. To instead generate");
438     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
439     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
440     System.err
441         .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
442     System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
443     System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
444     System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
445         + CF_RENAME_PROP + " property. Futher, filters will only use the"
446         + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
447         + " whether the current row needs to be ignored completely for processing and "
448         + "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
449         + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
450         + "the KeyValue.");
451     System.err.println("For performance consider the following options:\n"
452         + "  -Dmapred.map.tasks.speculative.execution=false\n"
453         + "  -Dmapred.reduce.tasks.speculative.execution=false");
454   }
455 
456   /**
457    * Main entry point.
458    *
459    * @param args  The command line parameters.
460    * @throws Exception When running the job fails.
461    */
462   public static void main(String[] args) throws Exception {
463     Configuration conf = HBaseConfiguration.create();
464     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
465     if (otherArgs.length < 2) {
466       usage("Wrong number of arguments: " + otherArgs.length);
467       System.exit(-1);
468     }
469     Job job = createSubmittableJob(conf, otherArgs);
470     System.exit(job.waitForCompletion(true) ? 0 : 1);
471   }
472 }