View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.lang.reflect.InvocationTargetException;
23  import java.lang.reflect.Method;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.UUID;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.classification.InterfaceStability;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.conf.Configured;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HBaseConfiguration;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValueUtil;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
45  import org.apache.hadoop.hbase.client.Delete;
46  import org.apache.hadoop.hbase.client.Durability;
47  import org.apache.hadoop.hbase.client.HBaseAdmin;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.Mutation;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.client.Result;
52  import org.apache.hadoop.hbase.filter.Filter;
53  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
56  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
57  import org.apache.hadoop.mapreduce.Job;
58  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
59  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
60  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
61  import org.apache.hadoop.util.GenericOptionsParser;
62  import org.apache.hadoop.util.Tool;
63  import org.apache.hadoop.util.ToolRunner;
64  import org.apache.zookeeper.KeeperException;
65  
66  
67  /**
68   * Import data written by {@link Export}.
69   */
70  @InterfaceAudience.Public
71  @InterfaceStability.Stable
72  public class Import extends Configured implements Tool {
73    private static final Log LOG = LogFactory.getLog(Import.class);
74    final static String NAME = "import";
75    public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
76    public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
77    public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
78    public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
79    public final static String TABLE_NAME = "import.table.name";
80    public final static String WAL_DURABILITY = "import.wal.durability";
81  
82    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
83  
84    /**
85     * A mapper that just writes out KeyValues.
86     */
87    public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
88      private Map<byte[], byte[]> cfRenameMap;
89      private Filter filter;
90      private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
91  
92      /**
93       * @param row  The current table row key.
94       * @param value  The columns.
95       * @param context  The current context.
96       * @throws IOException When something is broken with the data.
97       */
98      @Override
99      public void map(ImmutableBytesWritable row, Result value,
100       Context context)
101     throws IOException {
102       try {
103         if (LOG.isTraceEnabled()) {
104           LOG.trace("Considering the row."
105               + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
106         }
107         if (filter == null || !filter.filterRowKey(row.get(), row.getOffset(), row.getLength())) {
108           for (Cell kv : value.rawCells()) {
109             kv = filterKv(filter, kv);
110             // skip if we filtered it out
111             if (kv == null) continue;
112             // TODO get rid of ensureKeyValue
113             context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
114           }
115         }
116       } catch (InterruptedException e) {
117         e.printStackTrace();
118       }
119     }
120 
121     @Override
122     public void setup(Context context) {
123       cfRenameMap = createCfRenameMap(context.getConfiguration());
124       filter = instantiateFilter(context.getConfiguration());
125     }
126   }
127 
128   /**
129    * Write table content out to files in hdfs.
130    */
131   public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
132     private Map<byte[], byte[]> cfRenameMap;
133     private List<UUID> clusterIds;
134     private Filter filter;
135     private Durability durability;
136 
137     /**
138      * @param row  The current table row key.
139      * @param value  The columns.
140      * @param context  The current context.
141      * @throws IOException When something is broken with the data.
142      */
143     @Override
144     public void map(ImmutableBytesWritable row, Result value,
145       Context context)
146     throws IOException {
147       try {
148         writeResult(row, value, context);
149       } catch (InterruptedException e) {
150         e.printStackTrace();
151       }
152     }
153 
154     private void writeResult(ImmutableBytesWritable key, Result result, Context context)
155     throws IOException, InterruptedException {
156       Put put = null;
157       Delete delete = null;
158       if (LOG.isTraceEnabled()) {
159         LOG.trace("Considering the row."
160             + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
161       }
162       if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
163         processKV(key, result, context, put, delete);
164       }
165     }
166 
167     protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
168         Delete delete) throws IOException, InterruptedException {
169       for (Cell kv : result.rawCells()) {
170         kv = filterKv(filter, kv);
171         // skip if we filter it out
172         if (kv == null) continue;
173 
174         kv = convertKv(kv, cfRenameMap);
175         // Deletes and Puts are gathered and written when finished
176         /*
177          * If there are sequence of mutations and tombstones in an Export, and after Import the same
178          * sequence should be restored as it is. If we combine all Delete tombstones into single
179          * request then there is chance of ignoring few DeleteFamily tombstones, because if we
180          * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
181          * only newest in hbase table and ignoring other. Check - HBASE-12065
182          */
183         if (CellUtil.isDeleteFamily(kv)) {
184           Delete deleteFamily = new Delete(key.get());
185           deleteFamily.addDeleteMarker(kv);
186           if (durability != null) {
187             deleteFamily.setDurability(durability);
188           }
189           deleteFamily.setClusterIds(clusterIds);
190           context.write(key, deleteFamily);
191         } else if (CellUtil.isDelete(kv)) {
192           if (delete == null) {
193             delete = new Delete(key.get());
194           }
195           delete.addDeleteMarker(kv);
196         } else {
197           if (put == null) {
198             put = new Put(key.get());
199           }
200           addPutToKv(put, kv);
201         }
202       }
203       if (put != null) {
204         if (durability != null) {
205           put.setDurability(durability);
206         }
207         put.setClusterIds(clusterIds);
208         context.write(key, put);
209       }
210       if (delete != null) {
211         if (durability != null) {
212           delete.setDurability(durability);
213         }
214         delete.setClusterIds(clusterIds);
215         context.write(key, delete);
216       }
217     }
218 
219     protected void addPutToKv(Put put, Cell kv) throws IOException {
220       put.add(kv);
221     }
222 
223     @Override
224     public void setup(Context context) {
225       Configuration conf = context.getConfiguration();
226       cfRenameMap = createCfRenameMap(conf);
227       filter = instantiateFilter(conf);
228       String durabilityStr = conf.get(WAL_DURABILITY);
229       if(durabilityStr != null){
230         durability = Durability.valueOf(durabilityStr.toUpperCase());
231       }
232       // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
233       ZooKeeperWatcher zkw = null;
234       Exception ex = null;
235       try {
236         zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
237         clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
238       } catch (ZooKeeperConnectionException e) {
239         ex = e;
240         LOG.error("Problem connecting to ZooKeper during task setup", e);
241       } catch (KeeperException e) {
242         ex = e;
243         LOG.error("Problem reading ZooKeeper data during task setup", e);
244       } catch (IOException e) {
245         ex = e;
246         LOG.error("Problem setting up task", e);
247       } finally {
248         if (zkw != null) zkw.close();
249       }
250       if (clusterIds == null) {
251         // exit early if setup fails
252         throw new RuntimeException(ex);
253       }
254     }
255   }
256 
257   /**
258    * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
259    * optionally not include in the job output
260    * @param conf {@link Configuration} from which to load the filter
261    * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
262    * @throws IllegalArgumentException if the filter is misconfigured
263    */
264   public static Filter instantiateFilter(Configuration conf) {
265     // get the filter, if it was configured    
266     Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
267     if (filterClass == null) {
268       LOG.debug("No configured filter class, accepting all keyvalues.");
269       return null;
270     }
271     LOG.debug("Attempting to create filter:" + filterClass);
272     String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
273     ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
274     try {
275       Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
276       return (Filter) m.invoke(null, quotedArgs);
277     } catch (IllegalAccessException e) {
278       LOG.error("Couldn't instantiate filter!", e);
279       throw new RuntimeException(e);
280     } catch (SecurityException e) {
281       LOG.error("Couldn't instantiate filter!", e);
282       throw new RuntimeException(e);
283     } catch (NoSuchMethodException e) {
284       LOG.error("Couldn't instantiate filter!", e);
285       throw new RuntimeException(e);
286     } catch (IllegalArgumentException e) {
287       LOG.error("Couldn't instantiate filter!", e);
288       throw new RuntimeException(e);
289     } catch (InvocationTargetException e) {
290       LOG.error("Couldn't instantiate filter!", e);
291       throw new RuntimeException(e);
292     }
293   }
294 
295   private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
296     ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
297     for (String stringArg : stringArgs) {
298       // all the filters' instantiation methods expected quoted args since they are coming from
299       // the shell, so add them here, though it shouldn't really be needed :-/
300       quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
301     }
302     return quotedArgs;
303   }
304 
305   /**
306    * Attempt to filter out the keyvalue
307    * @param kv {@link KeyValue} on which to apply the filter
308    * @return <tt>null</tt> if the key should not be written, otherwise returns the original
309    *         {@link KeyValue}
310    */
311   public static Cell filterKv(Filter filter, Cell kv) throws IOException {
312     // apply the filter and skip this kv if the filter doesn't apply
313     if (filter != null) {
314       Filter.ReturnCode code = filter.filterKeyValue(kv);
315       if (LOG.isTraceEnabled()) {
316         LOG.trace("Filter returned:" + code + " for the key value:" + kv);
317       }
318       // if its not an accept type, then skip this kv
319       if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
320           .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
321         return null;
322       }
323     }
324     return kv;
325   }
326 
327   // helper: create a new KeyValue based on CF rename map
328   private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
329     if(cfRenameMap != null) {
330       // If there's a rename mapping for this CF, create a new KeyValue
331       byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
332       if(newCfName != null) {
333           kv = new KeyValue(kv.getRowArray(), // row buffer 
334                   kv.getRowOffset(),        // row offset
335                   kv.getRowLength(),        // row length
336                   newCfName,                // CF buffer
337                   0,                        // CF offset 
338                   newCfName.length,         // CF length 
339                   kv.getQualifierArray(),   // qualifier buffer
340                   kv.getQualifierOffset(),  // qualifier offset
341                   kv.getQualifierLength(),  // qualifier length
342                   kv.getTimestamp(),        // timestamp
343                   KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
344                   kv.getValueArray(),       // value buffer 
345                   kv.getValueOffset(),      // value offset
346                   kv.getValueLength());     // value length
347       }
348     }
349     return kv;
350   }
351 
352   // helper: make a map from sourceCfName to destCfName by parsing a config key
353   private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
354     Map<byte[], byte[]> cfRenameMap = null;
355     String allMappingsPropVal = conf.get(CF_RENAME_PROP);
356     if(allMappingsPropVal != null) {
357       // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
358       String[] allMappings = allMappingsPropVal.split(",");
359       for (String mapping: allMappings) {
360         if(cfRenameMap == null) {
361             cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
362         }
363         String [] srcAndDest = mapping.split(":");
364         if(srcAndDest.length != 2) {
365             continue;
366         }
367         cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
368       }
369     }
370     return cfRenameMap;
371   }
372 
373   /**
374    * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
375    * the mapper how to rename column families.
376    * 
377    * <p>Alternately, instead of calling this function, you could set the configuration key 
378    * {@link #CF_RENAME_PROP} yourself. The value should look like 
379    * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
380    * the mapper behavior.
381    * 
382    * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
383    *  set
384    * @param renameMap a mapping from source CF names to destination CF names
385    */
386   static public void configureCfRenaming(Configuration conf, 
387           Map<String, String> renameMap) {
388     StringBuilder sb = new StringBuilder();
389     for(Map.Entry<String,String> entry: renameMap.entrySet()) {
390       String sourceCf = entry.getKey();
391       String destCf = entry.getValue();
392 
393       if(sourceCf.contains(":") || sourceCf.contains(",") || 
394               destCf.contains(":") || destCf.contains(",")) {
395         throw new IllegalArgumentException("Illegal character in CF names: " 
396               + sourceCf + ", " + destCf);
397       }
398 
399       if(sb.length() != 0) {
400         sb.append(",");
401       }
402       sb.append(sourceCf + ":" + destCf);
403     }
404     conf.set(CF_RENAME_PROP, sb.toString());
405   }
406 
407   /**
408    * Add a Filter to be instantiated on import
409    * @param conf Configuration to update (will be passed to the job)
410    * @param clazz {@link Filter} subclass to instantiate on the server.
411    * @param filterArgs List of arguments to pass to the filter on instantiation
412    */
413   public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
414       List<String> filterArgs) throws IOException {
415     conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
416     conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
417   }
418 
419   /**
420    * Sets up the actual job.
421    * @param conf The current configuration.
422    * @param args The command line parameters.
423    * @return The newly created job.
424    * @throws IOException When setting up the job fails.
425    */
426   public static Job createSubmittableJob(Configuration conf, String[] args)
427   throws IOException {
428     TableName tableName = TableName.valueOf(args[0]);
429     conf.set(TABLE_NAME, tableName.getNameAsString());
430     Path inputDir = new Path(args[1]);
431     Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
432     job.setJarByClass(Importer.class);
433     FileInputFormat.setInputPaths(job, inputDir);
434     job.setInputFormatClass(SequenceFileInputFormat.class);
435     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
436 
437     // make sure we get the filter in the jars
438     try {
439       Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
440       if (filter != null) {
441         TableMapReduceUtil.addDependencyJars(conf, filter);
442       }
443     } catch (Exception e) {
444       throw new IOException(e);
445     }
446 
447     if (hfileOutPath != null) {
448       job.setMapperClass(KeyValueImporter.class);
449       HTable table = new HTable(conf, tableName);
450       job.setReducerClass(KeyValueSortReducer.class);
451       Path outputDir = new Path(hfileOutPath);
452       FileOutputFormat.setOutputPath(job, outputDir);
453       job.setMapOutputKeyClass(ImmutableBytesWritable.class);
454       job.setMapOutputValueClass(KeyValue.class);
455       HFileOutputFormat.configureIncrementalLoad(job, table);
456       TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
457           com.google.common.base.Preconditions.class);
458     } else {
459       // No reducers.  Just write straight to table.  Call initTableReducerJob
460       // because it sets up the TableOutputFormat.
461       job.setMapperClass(Importer.class);
462       TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
463       job.setNumReduceTasks(0);
464     }
465     return job;
466   }
467 
468   /*
469    * @param errorMsg Error message.  Can be null.
470    */
471   private static void usage(final String errorMsg) {
472     if (errorMsg != null && errorMsg.length() > 0) {
473       System.err.println("ERROR: " + errorMsg);
474     }
475     System.err.println("Usage: Import [options] <tablename> <inputdir>");
476     System.err.println("By default Import will load data directly into HBase. To instead generate");
477     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
478     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
479     System.err
480         .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
481     System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
482     System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
483     System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
484         + CF_RENAME_PROP + " property. Futher, filters will only use the"
485         + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
486         + " whether the current row needs to be ignored completely for processing and "
487         + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
488         + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
489         + " the KeyValue.");
490     System.err.println("   -D " + JOB_NAME_CONF_KEY
491         + "=jobName - use the specified mapreduce job name for the import");
492     System.err.println("For performance consider the following options:\n"
493         + "  -Dmapreduce.map.speculative=false\n"
494         + "  -Dmapreduce.reduce.speculative=false\n"
495         + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
496             +" Allowed values are the supported durability values"
497             +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
498   }
499 
500   /**
501    * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
502    * need to flush all the regions of the table as the data is held in memory and is also not
503    * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
504    * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
505    */
506   public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
507       InterruptedException {
508     String tableName = conf.get(TABLE_NAME);
509     HBaseAdmin hAdmin = null;
510     String durability = conf.get(WAL_DURABILITY);
511     // Need to flush if the data is written to hbase and skip wal is enabled.
512     if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
513         && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
514       try {
515         hAdmin = new HBaseAdmin(conf);
516         hAdmin.flush(tableName);
517       } finally {
518         if (hAdmin != null) {
519           hAdmin.close();
520         }
521       }
522     }
523   }
524 
525   @Override
526   public int run(String[] args) throws Exception {
527     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
528     if (otherArgs.length < 2) {
529       usage("Wrong number of arguments: " + otherArgs.length);
530       return -1;
531     }
532     String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
533     if (inputVersionString != null) {
534       getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
535     }
536     Job job = createSubmittableJob(getConf(), otherArgs);
537     boolean isJobSuccessful = job.waitForCompletion(true);
538     if(isJobSuccessful){
539       // Flush all the regions of the table
540       flushRegionsIfNecessary(getConf());
541     }
542     return (isJobSuccessful ? 0 : 1);
543   }
544 
545   /**
546    * Main entry point.
547    * @param args The command line parameters.
548    * @throws Exception When running the job fails.
549    */
550   public static void main(String[] args) throws Exception {
551     int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
552     System.exit(errCode);
553   }
554 
555 }