View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.lang.reflect.InvocationTargetException;
23  import java.lang.reflect.Method;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.UUID;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.classification.InterfaceStability;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.conf.Configured;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HBaseConfiguration;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValueUtil;
43  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
44  import org.apache.hadoop.hbase.client.Delete;
45  import org.apache.hadoop.hbase.client.Durability;
46  import org.apache.hadoop.hbase.client.HBaseAdmin;
47  import org.apache.hadoop.hbase.client.HTable;
48  import org.apache.hadoop.hbase.client.Mutation;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.client.Result;
51  import org.apache.hadoop.hbase.filter.Filter;
52  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
53  import org.apache.hadoop.hbase.util.Bytes;
54  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
55  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
56  import org.apache.hadoop.mapreduce.Job;
57  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
58  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
59  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
60  import org.apache.hadoop.util.GenericOptionsParser;
61  import org.apache.hadoop.util.Tool;
62  import org.apache.hadoop.util.ToolRunner;
63  import org.apache.zookeeper.KeeperException;
64  
65  
66  /**
67   * Import data written by {@link Export}.
68   */
69  @InterfaceAudience.Public
70  @InterfaceStability.Stable
71  public class Import extends Configured implements Tool {
72    private static final Log LOG = LogFactory.getLog(Import.class);
73    final static String NAME = "import";
74    public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
75    public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
76    public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
77    public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
78    public final static String TABLE_NAME = "import.table.name";
79    public final static String WAL_DURABILITY = "import.wal.durability";
80  
81    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
82  
83    /**
84     * A mapper that just writes out KeyValues.
85     */
86    public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
87      private Map<byte[], byte[]> cfRenameMap;
88      private Filter filter;
89      private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
90  
91      /**
92       * @param row  The current table row key.
93       * @param value  The columns.
94       * @param context  The current context.
95       * @throws IOException When something is broken with the data.
96       */
97      @Override
98      public void map(ImmutableBytesWritable row, Result value,
99        Context context)
100     throws IOException {
101       try {
102         if (LOG.isTraceEnabled()) {
103           LOG.trace("Considering the row."
104               + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
105         }
106         if (filter == null || !filter.filterRowKey(row.get(), row.getOffset(), row.getLength())) {
107           for (Cell kv : value.rawCells()) {
108             kv = filterKv(filter, kv);
109             // skip if we filtered it out
110             if (kv == null) continue;
111             // TODO get rid of ensureKeyValue
112             context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
113           }
114         }
115       } catch (InterruptedException e) {
116         e.printStackTrace();
117       }
118     }
119 
120     @Override
121     public void setup(Context context) {
122       cfRenameMap = createCfRenameMap(context.getConfiguration());
123       filter = instantiateFilter(context.getConfiguration());
124     }
125   }
126 
127   /**
128    * Write table content out to files in hdfs.
129    */
130   public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
131     private Map<byte[], byte[]> cfRenameMap;
132     private List<UUID> clusterIds;
133     private Filter filter;
134     private Durability durability;
135 
136     /**
137      * @param row  The current table row key.
138      * @param value  The columns.
139      * @param context  The current context.
140      * @throws IOException When something is broken with the data.
141      */
142     @Override
143     public void map(ImmutableBytesWritable row, Result value,
144       Context context)
145     throws IOException {
146       try {
147         writeResult(row, value, context);
148       } catch (InterruptedException e) {
149         e.printStackTrace();
150       }
151     }
152 
153     private void writeResult(ImmutableBytesWritable key, Result result, Context context)
154     throws IOException, InterruptedException {
155       Put put = null;
156       Delete delete = null;
157       if (LOG.isTraceEnabled()) {
158         LOG.trace("Considering the row."
159             + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
160       }
161       if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
162         processKV(key, result, context, put, delete);
163       }
164     }
165 
166     protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
167         Delete delete) throws IOException, InterruptedException {
168       for (Cell kv : result.rawCells()) {
169         kv = filterKv(filter, kv);
170         // skip if we filter it out
171         if (kv == null) continue;
172 
173         kv = convertKv(kv, cfRenameMap);
174         // Deletes and Puts are gathered and written when finished
175         if (CellUtil.isDelete(kv)) {
176           if (delete == null) {
177             delete = new Delete(key.get());
178           }
179           delete.addDeleteMarker(kv);
180         } else {
181           if (put == null) {
182             put = new Put(key.get());
183           }
184           addPutToKv(put, kv);
185         }
186       }
187       if (put != null) {
188         if (durability != null) {
189           put.setDurability(durability);
190         }
191         put.setClusterIds(clusterIds);
192         context.write(key, put);
193       }
194       if (delete != null) {
195         if (durability != null) {
196           delete.setDurability(durability);
197         }
198         delete.setClusterIds(clusterIds);
199         context.write(key, delete);
200       }
201     }
202 
203     protected void addPutToKv(Put put, Cell kv) throws IOException {
204       put.add(kv);
205     }
206 
207     @Override
208     public void setup(Context context) {
209       Configuration conf = context.getConfiguration();
210       cfRenameMap = createCfRenameMap(conf);
211       filter = instantiateFilter(conf);
212       String durabilityStr = conf.get(WAL_DURABILITY);
213       if(durabilityStr != null){
214         durability = Durability.valueOf(durabilityStr.toUpperCase());
215       }
216       // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
217       ZooKeeperWatcher zkw = null;
218       try {
219         zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
220         clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
221       } catch (ZooKeeperConnectionException e) {
222         LOG.error("Problem connecting to ZooKeper during task setup", e);
223       } catch (KeeperException e) {
224         LOG.error("Problem reading ZooKeeper data during task setup", e);
225       } catch (IOException e) {
226         LOG.error("Problem setting up task", e);
227       } finally {
228         if (zkw != null) zkw.close();
229       }
230     }
231   }
232 
233   /**
234    * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
235    * optionally not include in the job output
236    * @param conf {@link Configuration} from which to load the filter
237    * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
238    * @throws IllegalArgumentException if the filter is misconfigured
239    */
240   public static Filter instantiateFilter(Configuration conf) {
241     // get the filter, if it was configured    
242     Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
243     if (filterClass == null) {
244       LOG.debug("No configured filter class, accepting all keyvalues.");
245       return null;
246     }
247     LOG.debug("Attempting to create filter:" + filterClass);
248     String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
249     ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
250     try {
251       Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
252       return (Filter) m.invoke(null, quotedArgs);
253     } catch (IllegalAccessException e) {
254       LOG.error("Couldn't instantiate filter!", e);
255       throw new RuntimeException(e);
256     } catch (SecurityException e) {
257       LOG.error("Couldn't instantiate filter!", e);
258       throw new RuntimeException(e);
259     } catch (NoSuchMethodException e) {
260       LOG.error("Couldn't instantiate filter!", e);
261       throw new RuntimeException(e);
262     } catch (IllegalArgumentException e) {
263       LOG.error("Couldn't instantiate filter!", e);
264       throw new RuntimeException(e);
265     } catch (InvocationTargetException e) {
266       LOG.error("Couldn't instantiate filter!", e);
267       throw new RuntimeException(e);
268     }
269   }
270 
271   private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
272     ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
273     for (String stringArg : stringArgs) {
274       // all the filters' instantiation methods expected quoted args since they are coming from
275       // the shell, so add them here, though it shouldn't really be needed :-/
276       quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
277     }
278     return quotedArgs;
279   }
280 
281   /**
282    * Attempt to filter out the keyvalue
283    * @param kv {@link KeyValue} on which to apply the filter
284    * @return <tt>null</tt> if the key should not be written, otherwise returns the original
285    *         {@link KeyValue}
286    */
287   public static Cell filterKv(Filter filter, Cell kv) throws IOException {
288     // apply the filter and skip this kv if the filter doesn't apply
289     if (filter != null) {
290       Filter.ReturnCode code = filter.filterKeyValue(kv);
291       if (LOG.isTraceEnabled()) {
292         LOG.trace("Filter returned:" + code + " for the key value:" + kv);
293       }
294       // if its not an accept type, then skip this kv
295       if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
296           .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
297         return null;
298       }
299     }
300     return kv;
301   }
302 
303   // helper: create a new KeyValue based on CF rename map
304   private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
305     if(cfRenameMap != null) {
306       // If there's a rename mapping for this CF, create a new KeyValue
307       byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
308       if(newCfName != null) {
309           kv = new KeyValue(kv.getRowArray(), // row buffer 
310                   kv.getRowOffset(),        // row offset
311                   kv.getRowLength(),        // row length
312                   newCfName,                // CF buffer
313                   0,                        // CF offset 
314                   newCfName.length,         // CF length 
315                   kv.getQualifierArray(),   // qualifier buffer
316                   kv.getQualifierOffset(),  // qualifier offset
317                   kv.getQualifierLength(),  // qualifier length
318                   kv.getTimestamp(),        // timestamp
319                   KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
320                   kv.getValueArray(),       // value buffer 
321                   kv.getValueOffset(),      // value offset
322                   kv.getValueLength());     // value length
323       }
324     }
325     return kv;
326   }
327 
328   // helper: make a map from sourceCfName to destCfName by parsing a config key
329   private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
330     Map<byte[], byte[]> cfRenameMap = null;
331     String allMappingsPropVal = conf.get(CF_RENAME_PROP);
332     if(allMappingsPropVal != null) {
333       // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
334       String[] allMappings = allMappingsPropVal.split(",");
335       for (String mapping: allMappings) {
336         if(cfRenameMap == null) {
337             cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
338         }
339         String [] srcAndDest = mapping.split(":");
340         if(srcAndDest.length != 2) {
341             continue;
342         }
343         cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
344       }
345     }
346     return cfRenameMap;
347   }
348 
349   /**
350    * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
351    * the mapper how to rename column families.
352    * 
353    * <p>Alternately, instead of calling this function, you could set the configuration key 
354    * {@link #CF_RENAME_PROP} yourself. The value should look like 
355    * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
356    * the mapper behavior.
357    * 
358    * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
359    *  set
360    * @param renameMap a mapping from source CF names to destination CF names
361    */
362   static public void configureCfRenaming(Configuration conf, 
363           Map<String, String> renameMap) {
364     StringBuilder sb = new StringBuilder();
365     for(Map.Entry<String,String> entry: renameMap.entrySet()) {
366       String sourceCf = entry.getKey();
367       String destCf = entry.getValue();
368 
369       if(sourceCf.contains(":") || sourceCf.contains(",") || 
370               destCf.contains(":") || destCf.contains(",")) {
371         throw new IllegalArgumentException("Illegal character in CF names: " 
372               + sourceCf + ", " + destCf);
373       }
374 
375       if(sb.length() != 0) {
376         sb.append(",");
377       }
378       sb.append(sourceCf + ":" + destCf);
379     }
380     conf.set(CF_RENAME_PROP, sb.toString());
381   }
382 
383   /**
384    * Add a Filter to be instantiated on import
385    * @param conf Configuration to update (will be passed to the job)
386    * @param clazz {@link Filter} subclass to instantiate on the server.
387    * @param filterArgs List of arguments to pass to the filter on instantiation
388    */
389   public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
390       List<String> filterArgs) throws IOException {
391     conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
392     conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
393   }
394 
395   /**
396    * Sets up the actual job.
397    * @param conf The current configuration.
398    * @param args The command line parameters.
399    * @return The newly created job.
400    * @throws IOException When setting up the job fails.
401    */
402   public static Job createSubmittableJob(Configuration conf, String[] args)
403   throws IOException {
404     String tableName = args[0];
405     conf.set(TABLE_NAME, tableName);
406     Path inputDir = new Path(args[1]);
407     Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
408     job.setJarByClass(Importer.class);
409     FileInputFormat.setInputPaths(job, inputDir);
410     job.setInputFormatClass(SequenceFileInputFormat.class);
411     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
412 
413     // make sure we get the filter in the jars
414     try {
415       Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
416       if (filter != null) {
417         TableMapReduceUtil.addDependencyJars(conf, filter);
418       }
419     } catch (Exception e) {
420       throw new IOException(e);
421     }
422 
423     if (hfileOutPath != null) {
424       job.setMapperClass(KeyValueImporter.class);
425       HTable table = new HTable(conf, tableName);
426       job.setReducerClass(KeyValueSortReducer.class);
427       Path outputDir = new Path(hfileOutPath);
428       FileOutputFormat.setOutputPath(job, outputDir);
429       job.setMapOutputKeyClass(ImmutableBytesWritable.class);
430       job.setMapOutputValueClass(KeyValue.class);
431       HFileOutputFormat.configureIncrementalLoad(job, table);
432       TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
433           com.google.common.base.Preconditions.class);
434     } else {
435       // No reducers.  Just write straight to table.  Call initTableReducerJob
436       // because it sets up the TableOutputFormat.
437       job.setMapperClass(Importer.class);
438       TableMapReduceUtil.initTableReducerJob(tableName, null, job);
439       job.setNumReduceTasks(0);
440     }
441     return job;
442   }
443 
444   /*
445    * @param errorMsg Error message.  Can be null.
446    */
447   private static void usage(final String errorMsg) {
448     if (errorMsg != null && errorMsg.length() > 0) {
449       System.err.println("ERROR: " + errorMsg);
450     }
451     System.err.println("Usage: Import [options] <tablename> <inputdir>");
452     System.err.println("By default Import will load data directly into HBase. To instead generate");
453     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
454     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
455     System.err
456         .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
457     System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
458     System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
459     System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
460         + CF_RENAME_PROP + " property. Futher, filters will only use the"
461         + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
462         + " whether the current row needs to be ignored completely for processing and "
463         + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
464         + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
465         + " the KeyValue.");
466     System.err.println("   -D " + JOB_NAME_CONF_KEY
467         + "=jobName - use the specified mapreduce job name for the import");
468     System.err.println("For performance consider the following options:\n"
469         + "  -Dmapreduce.map.speculative=false\n"
470         + "  -Dmapreduce.reduce.speculative=false\n"
471         + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
472             +" Allowed values are the supported durability values"
473             +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
474   }
475 
476   /**
477    * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
478    * need to flush all the regions of the table as the data is held in memory and is also not
479    * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
480    * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
481    */
482   public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
483       InterruptedException {
484     String tableName = conf.get(TABLE_NAME);
485     HBaseAdmin hAdmin = null;
486     String durability = conf.get(WAL_DURABILITY);
487     // Need to flush if the data is written to hbase and skip wal is enabled.
488     if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
489         && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
490       try {
491         hAdmin = new HBaseAdmin(conf);
492         hAdmin.flush(tableName);
493       } finally {
494         if (hAdmin != null) {
495           hAdmin.close();
496         }
497       }
498     }
499   }
500 
501   @Override
502   public int run(String[] args) throws Exception {
503     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
504     if (otherArgs.length < 2) {
505       usage("Wrong number of arguments: " + otherArgs.length);
506       return -1;
507     }
508     String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
509     if (inputVersionString != null) {
510       getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
511     }
512     Job job = createSubmittableJob(getConf(), otherArgs);
513     boolean isJobSuccessful = job.waitForCompletion(true);
514     if(isJobSuccessful){
515       // Flush all the regions of the table
516       flushRegionsIfNecessary(getConf());
517     }
518     return (isJobSuccessful ? 0 : 1);
519   }
520 
521   /**
522    * Main entry point.
523    * @param args The command line parameters.
524    * @throws Exception When running the job fails.
525    */
526   public static void main(String[] args) throws Exception {
527     int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
528     System.exit(errCode);
529   }
530 
531 }