View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.lang.reflect.InvocationTargetException;
23  import java.lang.reflect.Method;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.UUID;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.classification.InterfaceStability;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellUtil;
39  import org.apache.hadoop.hbase.HBaseConfiguration;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.KeyValueUtil;
42  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
43  import org.apache.hadoop.hbase.client.Delete;
44  import org.apache.hadoop.hbase.client.Durability;
45  import org.apache.hadoop.hbase.client.HBaseAdmin;
46  import org.apache.hadoop.hbase.client.HTable;
47  import org.apache.hadoop.hbase.client.Mutation;
48  import org.apache.hadoop.hbase.client.Put;
49  import org.apache.hadoop.hbase.client.Result;
50  import org.apache.hadoop.hbase.filter.Filter;
51  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
54  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
55  import org.apache.hadoop.mapreduce.Job;
56  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
57  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
58  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
59  import org.apache.hadoop.util.GenericOptionsParser;
60  import org.apache.zookeeper.KeeperException;
61  
62  
63  /**
64   * Import data written by {@link Export}.
65   */
66  @InterfaceAudience.Public
67  @InterfaceStability.Stable
68  public class Import {
69    private static final Log LOG = LogFactory.getLog(Import.class);
70    final static String NAME = "import";
71    public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
72    public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
73    public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
74    public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
75    public final static String TABLE_NAME = "import.table.name";
76    public final static String WAL_DURABILITY = "import.wal.durability";
77  
78    /**
79     * A mapper that just writes out KeyValues.
80     */
81    public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
82      private Map<byte[], byte[]> cfRenameMap;
83      private Filter filter;
84      private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
85  
86      /**
87       * @param row  The current table row key.
88       * @param value  The columns.
89       * @param context  The current context.
90       * @throws IOException When something is broken with the data.
91       */
92      @Override
93      public void map(ImmutableBytesWritable row, Result value,
94        Context context)
95      throws IOException {
96        try {
97          if (LOG.isTraceEnabled()) {
98            LOG.trace("Considering the row."
99                + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
100         }
101         if (filter == null || !filter.filterRowKey(row.get(), row.getOffset(), row.getLength())) {
102           for (Cell kv : value.rawCells()) {
103             kv = filterKv(filter, kv);
104             // skip if we filtered it out
105             if (kv == null) continue;
106             // TODO get rid of ensureKeyValue
107             context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
108           }
109         }
110       } catch (InterruptedException e) {
111         e.printStackTrace();
112       }
113     }
114 
115     @Override
116     public void setup(Context context) {
117       cfRenameMap = createCfRenameMap(context.getConfiguration());
118       filter = instantiateFilter(context.getConfiguration());
119     }
120   }
121 
122   /**
123    * Write table content out to files in hdfs.
124    */
125   public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
126     private Map<byte[], byte[]> cfRenameMap;
127     private List<UUID> clusterIds;
128     private Filter filter;
129     private Durability durability;
130 
131     /**
132      * @param row  The current table row key.
133      * @param value  The columns.
134      * @param context  The current context.
135      * @throws IOException When something is broken with the data.
136      */
137     @Override
138     public void map(ImmutableBytesWritable row, Result value,
139       Context context)
140     throws IOException {
141       try {
142         writeResult(row, value, context);
143       } catch (InterruptedException e) {
144         e.printStackTrace();
145       }
146     }
147 
148     private void writeResult(ImmutableBytesWritable key, Result result, Context context)
149     throws IOException, InterruptedException {
150       Put put = null;
151       Delete delete = null;
152       if (LOG.isTraceEnabled()) {
153         LOG.trace("Considering the row."
154             + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
155       }
156       if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
157         for (Cell kv : result.rawCells()) {
158           kv = filterKv(filter, kv);
159           // skip if we filter it out
160           if (kv == null) continue;
161 
162           kv = convertKv(kv, cfRenameMap);
163           // Deletes and Puts are gathered and written when finished
164           if (CellUtil.isDelete(kv)) {
165             if (delete == null) {
166               delete = new Delete(key.get());
167             }
168             delete.addDeleteMarker(kv);
169           } else {
170             if (put == null) {
171               put = new Put(key.get());
172             }
173             put.add(kv);
174           }
175         }
176         if (put != null) {
177           if (durability != null) {
178             put.setDurability(durability);
179           }
180           put.setClusterIds(clusterIds);
181           context.write(key, put);
182         }
183         if (delete != null) {
184           if (durability != null) {
185             delete.setDurability(durability);
186           }
187           delete.setClusterIds(clusterIds);
188           context.write(key, delete);
189         }
190       }
191     }
192 
193     @Override
194     public void setup(Context context) {
195       Configuration conf = context.getConfiguration();
196       cfRenameMap = createCfRenameMap(conf);
197       filter = instantiateFilter(conf);
198       String durabilityStr = conf.get(WAL_DURABILITY);
199       if(durabilityStr != null){
200         durability = Durability.valueOf(durabilityStr.toUpperCase());
201       }
202       // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
203       ZooKeeperWatcher zkw = null;
204       try {
205         zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
206         clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
207       } catch (ZooKeeperConnectionException e) {
208         LOG.error("Problem connecting to ZooKeper during task setup", e);
209       } catch (KeeperException e) {
210         LOG.error("Problem reading ZooKeeper data during task setup", e);
211       } catch (IOException e) {
212         LOG.error("Problem setting up task", e);
213       } finally {
214         if (zkw != null) zkw.close();
215       }
216     }
217   }
218 
219   /**
220    * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
221    * optionally not include in the job output
222    * @param conf {@link Configuration} from which to load the filter
223    * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
224    * @throws IllegalArgumentException if the filter is misconfigured
225    */
226   public static Filter instantiateFilter(Configuration conf) {
227     // get the filter, if it was configured    
228     Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
229     if (filterClass == null) {
230       LOG.debug("No configured filter class, accepting all keyvalues.");
231       return null;
232     }
233     LOG.debug("Attempting to create filter:" + filterClass);
234     String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
235     ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
236     try {
237       Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
238       return (Filter) m.invoke(null, quotedArgs);
239     } catch (IllegalAccessException e) {
240       LOG.error("Couldn't instantiate filter!", e);
241       throw new RuntimeException(e);
242     } catch (SecurityException e) {
243       LOG.error("Couldn't instantiate filter!", e);
244       throw new RuntimeException(e);
245     } catch (NoSuchMethodException e) {
246       LOG.error("Couldn't instantiate filter!", e);
247       throw new RuntimeException(e);
248     } catch (IllegalArgumentException e) {
249       LOG.error("Couldn't instantiate filter!", e);
250       throw new RuntimeException(e);
251     } catch (InvocationTargetException e) {
252       LOG.error("Couldn't instantiate filter!", e);
253       throw new RuntimeException(e);
254     }
255   }
256 
257   private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
258     ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
259     for (String stringArg : stringArgs) {
260       // all the filters' instantiation methods expected quoted args since they are coming from
261       // the shell, so add them here, though it shouldn't really be needed :-/
262       quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
263     }
264     return quotedArgs;
265   }
266 
267   /**
268    * Attempt to filter out the keyvalue
269    * @param kv {@link KeyValue} on which to apply the filter
270    * @return <tt>null</tt> if the key should not be written, otherwise returns the original
271    *         {@link KeyValue}
272    */
273   public static Cell filterKv(Filter filter, Cell kv) throws IOException {
274     // apply the filter and skip this kv if the filter doesn't apply
275     if (filter != null) {
276       Filter.ReturnCode code = filter.filterKeyValue(kv);
277       if (LOG.isTraceEnabled()) {
278         LOG.trace("Filter returned:" + code + " for the key value:" + kv);
279       }
280       // if its not an accept type, then skip this kv
281       if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
282           .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
283         return null;
284       }
285     }
286     return kv;
287   }
288 
289   // helper: create a new KeyValue based on CF rename map
290   private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
291     if(cfRenameMap != null) {
292       // If there's a rename mapping for this CF, create a new KeyValue
293       byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
294       if(newCfName != null) {
295           kv = new KeyValue(kv.getRowArray(), // row buffer 
296                   kv.getRowOffset(),        // row offset
297                   kv.getRowLength(),        // row length
298                   newCfName,                // CF buffer
299                   0,                        // CF offset 
300                   newCfName.length,         // CF length 
301                   kv.getQualifierArray(),   // qualifier buffer
302                   kv.getQualifierOffset(),  // qualifier offset
303                   kv.getQualifierLength(),  // qualifier length
304                   kv.getTimestamp(),        // timestamp
305                   KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
306                   kv.getValueArray(),       // value buffer 
307                   kv.getValueOffset(),      // value offset
308                   kv.getValueLength());     // value length
309       }
310     }
311     return kv;
312   }
313 
314   // helper: make a map from sourceCfName to destCfName by parsing a config key
315   private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
316     Map<byte[], byte[]> cfRenameMap = null;
317     String allMappingsPropVal = conf.get(CF_RENAME_PROP);
318     if(allMappingsPropVal != null) {
319       // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
320       String[] allMappings = allMappingsPropVal.split(",");
321       for (String mapping: allMappings) {
322         if(cfRenameMap == null) {
323             cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
324         }
325         String [] srcAndDest = mapping.split(":");
326         if(srcAndDest.length != 2) {
327             continue;
328         }
329         cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
330       }
331     }
332     return cfRenameMap;
333   }
334 
335   /**
336    * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
337    * the mapper how to rename column families.
338    * 
339    * <p>Alternately, instead of calling this function, you could set the configuration key 
340    * {@link #CF_RENAME_PROP} yourself. The value should look like 
341    * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
342    * the mapper behavior.
343    * 
344    * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
345    *  set
346    * @param renameMap a mapping from source CF names to destination CF names
347    */
348   static public void configureCfRenaming(Configuration conf, 
349           Map<String, String> renameMap) {
350     StringBuilder sb = new StringBuilder();
351     for(Map.Entry<String,String> entry: renameMap.entrySet()) {
352       String sourceCf = entry.getKey();
353       String destCf = entry.getValue();
354 
355       if(sourceCf.contains(":") || sourceCf.contains(",") || 
356               destCf.contains(":") || destCf.contains(",")) {
357         throw new IllegalArgumentException("Illegal character in CF names: " 
358               + sourceCf + ", " + destCf);
359       }
360 
361       if(sb.length() != 0) {
362         sb.append(",");
363       }
364       sb.append(sourceCf + ":" + destCf);
365     }
366     conf.set(CF_RENAME_PROP, sb.toString());
367   }
368 
369   /**
370    * Add a Filter to be instantiated on import
371    * @param conf Configuration to update (will be passed to the job)
372    * @param clazz {@link Filter} subclass to instantiate on the server.
373    * @param filterArgs List of arguments to pass to the filter on instantiation
374    */
375   public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
376       List<String> filterArgs) throws IOException {
377     conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
378     conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
379   }
380 
381   /**
382    * Sets up the actual job.
383    * @param conf The current configuration.
384    * @param args The command line parameters.
385    * @return The newly created job.
386    * @throws IOException When setting up the job fails.
387    */
388   public static Job createSubmittableJob(Configuration conf, String[] args)
389   throws IOException {
390     String tableName = args[0];
391     conf.set(TABLE_NAME, tableName);
392     Path inputDir = new Path(args[1]);
393     Job job = new Job(conf, NAME + "_" + tableName);
394     job.setJarByClass(Importer.class);
395     FileInputFormat.setInputPaths(job, inputDir);
396     job.setInputFormatClass(SequenceFileInputFormat.class);
397     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
398 
399     // make sure we get the filter in the jars
400     try {
401       Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
402       if (filter != null) {
403         TableMapReduceUtil.addDependencyJars(conf, filter);
404       }
405     } catch (Exception e) {
406       throw new IOException(e);
407     }
408 
409     if (hfileOutPath != null) {
410       job.setMapperClass(KeyValueImporter.class);
411       HTable table = new HTable(conf, tableName);
412       job.setReducerClass(KeyValueSortReducer.class);
413       Path outputDir = new Path(hfileOutPath);
414       FileOutputFormat.setOutputPath(job, outputDir);
415       job.setMapOutputKeyClass(ImmutableBytesWritable.class);
416       job.setMapOutputValueClass(KeyValue.class);
417       HFileOutputFormat.configureIncrementalLoad(job, table);
418       TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
419           com.google.common.base.Preconditions.class);
420     } else {
421       // No reducers.  Just write straight to table.  Call initTableReducerJob
422       // because it sets up the TableOutputFormat.
423       job.setMapperClass(Importer.class);
424       TableMapReduceUtil.initTableReducerJob(tableName, null, job);
425       job.setNumReduceTasks(0);
426     }
427     return job;
428   }
429 
430   /*
431    * @param errorMsg Error message.  Can be null.
432    */
433   private static void usage(final String errorMsg) {
434     if (errorMsg != null && errorMsg.length() > 0) {
435       System.err.println("ERROR: " + errorMsg);
436     }
437     System.err.println("Usage: Import [options] <tablename> <inputdir>");
438     System.err.println("By default Import will load data directly into HBase. To instead generate");
439     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
440     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
441     System.err
442         .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
443     System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
444     System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
445     System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
446         + CF_RENAME_PROP + " property. Futher, filters will only use the"
447         + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
448         + " whether the current row needs to be ignored completely for processing and "
449         + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
450         + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
451         + " the KeyValue.");
452     System.err.println("For performance consider the following options:\n"
453         + "  -Dmapreduce.map.speculative=false\n"
454         + "  -Dmapreduce.reduce.speculative=false\n"
455         + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
456             +" Allowed values are the supported durability values"
457             +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
458   }
459 
460   /**
461    * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
462    * need to flush all the regions of the table as the data is held in memory and is also not
463    * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
464    * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
465    */
466   public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
467       InterruptedException {
468     String tableName = conf.get(TABLE_NAME);
469     HBaseAdmin hAdmin = null;
470     String durability = conf.get(WAL_DURABILITY);
471     // Need to flush if the data is written to hbase and skip wal is enabled.
472     if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
473         && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
474       try {
475         hAdmin = new HBaseAdmin(conf);
476         hAdmin.flush(tableName);
477       } finally {
478         if (hAdmin != null) {
479           hAdmin.close();
480         }
481       }
482     }
483   }
484 
485   /**
486    * Main entry point.
487    *
488    * @param args  The command line parameters.
489    * @throws Exception When running the job fails.
490    */
491   public static void main(String[] args) throws Exception {
492     Configuration conf = HBaseConfiguration.create();
493     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
494     if (otherArgs.length < 2) {
495       usage("Wrong number of arguments: " + otherArgs.length);
496       System.exit(-1);
497     }
498     String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
499     if (inputVersionString != null) {
500       conf.set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
501     }
502     Job job = createSubmittableJob(conf, otherArgs);
503     boolean isJobSuccessful = job.waitForCompletion(true);
504     if(isJobSuccessful){
505       // Flush all the regions of the table
506       flushRegionsIfNecessary(conf);
507     }
508     System.exit(job.waitForCompletion(true) ? 0 : 1);
509   }
510 }