View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.lang.reflect.InvocationTargetException;
23  import java.lang.reflect.Method;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.UUID;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.classification.InterfaceStability;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellUtil;
39  import org.apache.hadoop.hbase.HBaseConfiguration;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.KeyValueUtil;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
44  import org.apache.hadoop.hbase.client.Connection;
45  import org.apache.hadoop.hbase.client.ConnectionFactory;
46  import org.apache.hadoop.hbase.client.Delete;
47  import org.apache.hadoop.hbase.client.Durability;
48  import org.apache.hadoop.hbase.client.HBaseAdmin;
49  import org.apache.hadoop.hbase.client.Mutation;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.client.RegionLocator;
52  import org.apache.hadoop.hbase.client.Result;
53  import org.apache.hadoop.hbase.client.Table;
54  import org.apache.hadoop.hbase.filter.Filter;
55  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
56  import org.apache.hadoop.hbase.util.Bytes;
57  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
58  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
59  import org.apache.hadoop.mapreduce.Job;
60  import org.apache.hadoop.mapreduce.TaskCounter;
61  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
62  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
63  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
64  import org.apache.hadoop.util.GenericOptionsParser;
65  import org.apache.zookeeper.KeeperException;
66  
67  
68  /**
69   * Import data written by {@link Export}.
70   */
71  @InterfaceAudience.Public
72  @InterfaceStability.Stable
73  public class Import {
74    private static final Log LOG = LogFactory.getLog(Import.class);
75    final static String NAME = "import";
76    public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
77    public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
78    public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
79    public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
80    public final static String TABLE_NAME = "import.table.name";
81    public final static String WAL_DURABILITY = "import.wal.durability";
82  
83    /**
84     * A mapper that just writes out KeyValues.
85     */
86    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
87        justification="Writables are going away and this has been this way forever")
88    public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
89      private Map<byte[], byte[]> cfRenameMap;
90      private Filter filter;
91      private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
92  
93      /**
94       * @param row  The current table row key.
95       * @param value  The columns.
96       * @param context  The current context.
97       * @throws IOException When something is broken with the data.
98       */
99      @Override
100     public void map(ImmutableBytesWritable row, Result value,
101       Context context)
102     throws IOException {
103       try {
104         if (LOG.isTraceEnabled()) {
105           LOG.trace("Considering the row."
106               + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
107         }
108         if (filter == null || !filter.filterRowKey(row.get(), row.getOffset(), row.getLength())) {
109           for (Cell kv : value.rawCells()) {
110             kv = filterKv(filter, kv);
111             // skip if we filtered it out
112             if (kv == null) continue;
113             // TODO get rid of ensureKeyValue
114             context.write(row, KeyValueUtil.ensureKeyValueTypeForMR(convertKv(kv, cfRenameMap)));
115           }
116         }
117       } catch (InterruptedException e) {
118         e.printStackTrace();
119       }
120     }
121 
122     @Override
123     public void setup(Context context) {
124       cfRenameMap = createCfRenameMap(context.getConfiguration());
125       filter = instantiateFilter(context.getConfiguration());
126     }
127   }
128 
129   /**
130    * Write table content out to files in hdfs.
131    */
132   public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
133     private Map<byte[], byte[]> cfRenameMap;
134     private List<UUID> clusterIds;
135     private Filter filter;
136     private Durability durability;
137 
138     /**
139      * @param row  The current table row key.
140      * @param value  The columns.
141      * @param context  The current context.
142      * @throws IOException When something is broken with the data.
143      */
144     @Override
145     public void map(ImmutableBytesWritable row, Result value,
146       Context context)
147     throws IOException {
148       try {
149         writeResult(row, value, context);
150       } catch (InterruptedException e) {
151         e.printStackTrace();
152       }
153     }
154 
155     private void writeResult(ImmutableBytesWritable key, Result result, Context context)
156     throws IOException, InterruptedException {
157       Put put = null;
158       Delete delete = null;
159       if (LOG.isTraceEnabled()) {
160         LOG.trace("Considering the row."
161             + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
162       }
163       if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
164         processKV(key, result, context, put, delete);
165       }
166     }
167 
168     protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
169         Delete delete) throws IOException, InterruptedException {
170       for (Cell kv : result.rawCells()) {
171         kv = filterKv(filter, kv);
172         // skip if we filter it out
173         if (kv == null) continue;
174 
175         kv = convertKv(kv, cfRenameMap);
176         // Deletes and Puts are gathered and written when finished
177         /*
178          * If there are sequence of mutations and tombstones in an Export, and after Import the same
179          * sequence should be restored as it is. If we combine all Delete tombstones into single
180          * request then there is chance of ignoring few DeleteFamily tombstones, because if we
181          * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
182          * only newest in hbase table and ignoring other. Check - HBASE-12065
183          */
184         if (CellUtil.isDeleteFamily(kv)) {
185           Delete deleteFamily = new Delete(key.get());
186           deleteFamily.addDeleteMarker(kv);
187           if (durability != null) {
188             deleteFamily.setDurability(durability);
189           }
190           deleteFamily.setClusterIds(clusterIds);
191           context.write(key, deleteFamily);
192         } else if (CellUtil.isDelete(kv)) {
193           if (delete == null) {
194             delete = new Delete(key.get());
195           }
196           delete.addDeleteMarker(kv);
197         } else {
198           if (put == null) {
199             put = new Put(key.get());
200           }
201           addPutToKv(put, kv);
202         }
203       }
204       if (put != null) {
205         if (durability != null) {
206           put.setDurability(durability);
207         }
208         put.setClusterIds(clusterIds);
209         context.write(key, put);
210       }
211       if (delete != null) {
212         if (durability != null) {
213           delete.setDurability(durability);
214         }
215         delete.setClusterIds(clusterIds);
216         context.write(key, delete);
217       }
218     }
219 
220     protected void addPutToKv(Put put, Cell kv) throws IOException {
221       put.add(kv);
222     }
223 
224     @Override
225     public void setup(Context context) {
226       Configuration conf = context.getConfiguration();
227       cfRenameMap = createCfRenameMap(conf);
228       filter = instantiateFilter(conf);
229       String durabilityStr = conf.get(WAL_DURABILITY);
230       if(durabilityStr != null){
231         durability = Durability.valueOf(durabilityStr.toUpperCase());
232       }
233       // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
234       ZooKeeperWatcher zkw = null;
235       Exception ex = null;
236       try {
237         zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
238         clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
239       } catch (ZooKeeperConnectionException e) {
240         ex = e;
241         LOG.error("Problem connecting to ZooKeper during task setup", e);
242       } catch (KeeperException e) {
243         ex = e;
244         LOG.error("Problem reading ZooKeeper data during task setup", e);
245       } catch (IOException e) {
246         ex = e;
247         LOG.error("Problem setting up task", e);
248       } finally {
249         if (zkw != null) zkw.close();
250       }
251       if (clusterIds == null) {
252         // exit early if setup fails
253         throw new RuntimeException(ex);
254       }
255     }
256   }
257 
258   /**
259    * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
260    * optionally not include in the job output
261    * @param conf {@link Configuration} from which to load the filter
262    * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
263    * @throws IllegalArgumentException if the filter is misconfigured
264    */
265   public static Filter instantiateFilter(Configuration conf) {
266     // get the filter, if it was configured    
267     Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
268     if (filterClass == null) {
269       LOG.debug("No configured filter class, accepting all keyvalues.");
270       return null;
271     }
272     LOG.debug("Attempting to create filter:" + filterClass);
273     String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
274     ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
275     try {
276       Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
277       return (Filter) m.invoke(null, quotedArgs);
278     } catch (IllegalAccessException e) {
279       LOG.error("Couldn't instantiate filter!", e);
280       throw new RuntimeException(e);
281     } catch (SecurityException e) {
282       LOG.error("Couldn't instantiate filter!", e);
283       throw new RuntimeException(e);
284     } catch (NoSuchMethodException e) {
285       LOG.error("Couldn't instantiate filter!", e);
286       throw new RuntimeException(e);
287     } catch (IllegalArgumentException e) {
288       LOG.error("Couldn't instantiate filter!", e);
289       throw new RuntimeException(e);
290     } catch (InvocationTargetException e) {
291       LOG.error("Couldn't instantiate filter!", e);
292       throw new RuntimeException(e);
293     }
294   }
295 
296   private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
297     ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
298     for (String stringArg : stringArgs) {
299       // all the filters' instantiation methods expected quoted args since they are coming from
300       // the shell, so add them here, though it shouldn't really be needed :-/
301       quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
302     }
303     return quotedArgs;
304   }
305 
306   /**
307    * Attempt to filter out the keyvalue
308    * @param kv {@link KeyValue} on which to apply the filter
309    * @return <tt>null</tt> if the key should not be written, otherwise returns the original
310    *         {@link KeyValue}
311    */
312   public static Cell filterKv(Filter filter, Cell kv) throws IOException {
313     // apply the filter and skip this kv if the filter doesn't apply
314     if (filter != null) {
315       Filter.ReturnCode code = filter.filterKeyValue(kv);
316       if (LOG.isTraceEnabled()) {
317         LOG.trace("Filter returned:" + code + " for the key value:" + kv);
318       }
319       // if its not an accept type, then skip this kv
320       if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
321           .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
322         return null;
323       }
324     }
325     return kv;
326   }
327 
328   // helper: create a new KeyValue based on CF rename map
329   private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
330     if(cfRenameMap != null) {
331       // If there's a rename mapping for this CF, create a new KeyValue
332       byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
333       if(newCfName != null) {
334           kv = new KeyValue(kv.getRowArray(), // row buffer 
335                   kv.getRowOffset(),        // row offset
336                   kv.getRowLength(),        // row length
337                   newCfName,                // CF buffer
338                   0,                        // CF offset 
339                   newCfName.length,         // CF length 
340                   kv.getQualifierArray(),   // qualifier buffer
341                   kv.getQualifierOffset(),  // qualifier offset
342                   kv.getQualifierLength(),  // qualifier length
343                   kv.getTimestamp(),        // timestamp
344                   KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
345                   kv.getValueArray(),       // value buffer 
346                   kv.getValueOffset(),      // value offset
347                   kv.getValueLength());     // value length
348       }
349     }
350     return kv;
351   }
352 
353   // helper: make a map from sourceCfName to destCfName by parsing a config key
354   private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
355     Map<byte[], byte[]> cfRenameMap = null;
356     String allMappingsPropVal = conf.get(CF_RENAME_PROP);
357     if(allMappingsPropVal != null) {
358       // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
359       String[] allMappings = allMappingsPropVal.split(",");
360       for (String mapping: allMappings) {
361         if(cfRenameMap == null) {
362             cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
363         }
364         String [] srcAndDest = mapping.split(":");
365         if(srcAndDest.length != 2) {
366             continue;
367         }
368         cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
369       }
370     }
371     return cfRenameMap;
372   }
373 
374   /**
375    * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
376    * the mapper how to rename column families.
377    * 
378    * <p>Alternately, instead of calling this function, you could set the configuration key 
379    * {@link #CF_RENAME_PROP} yourself. The value should look like 
380    * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
381    * the mapper behavior.
382    * 
383    * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
384    *  set
385    * @param renameMap a mapping from source CF names to destination CF names
386    */
387   static public void configureCfRenaming(Configuration conf, 
388           Map<String, String> renameMap) {
389     StringBuilder sb = new StringBuilder();
390     for(Map.Entry<String,String> entry: renameMap.entrySet()) {
391       String sourceCf = entry.getKey();
392       String destCf = entry.getValue();
393 
394       if(sourceCf.contains(":") || sourceCf.contains(",") || 
395               destCf.contains(":") || destCf.contains(",")) {
396         throw new IllegalArgumentException("Illegal character in CF names: " 
397               + sourceCf + ", " + destCf);
398       }
399 
400       if(sb.length() != 0) {
401         sb.append(",");
402       }
403       sb.append(sourceCf + ":" + destCf);
404     }
405     conf.set(CF_RENAME_PROP, sb.toString());
406   }
407 
408   /**
409    * Add a Filter to be instantiated on import
410    * @param conf Configuration to update (will be passed to the job)
411    * @param clazz {@link Filter} subclass to instantiate on the server.
412    * @param filterArgs List of arguments to pass to the filter on instantiation
413    */
414   public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
415       List<String> filterArgs) throws IOException {
416     conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
417     conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
418   }
419 
420   /**
421    * Sets up the actual job.
422    * @param conf The current configuration.
423    * @param args The command line parameters.
424    * @return The newly created job.
425    * @throws IOException When setting up the job fails.
426    */
427   public static Job createSubmittableJob(Configuration conf, String[] args)
428   throws IOException {
429     TableName tableName = TableName.valueOf(args[0]);
430     conf.set(TABLE_NAME, tableName.getNameAsString());
431     Path inputDir = new Path(args[1]);
432     Job job = new Job(conf, NAME + "_" + tableName);
433     job.setJarByClass(Importer.class);
434     FileInputFormat.setInputPaths(job, inputDir);
435     job.setInputFormatClass(SequenceFileInputFormat.class);
436     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
437 
438     // make sure we get the filter in the jars
439     try {
440       Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
441       if (filter != null) {
442         TableMapReduceUtil.addDependencyJars(conf, filter);
443       }
444     } catch (Exception e) {
445       throw new IOException(e);
446     }
447 
448     if (hfileOutPath != null) {
449       job.setMapperClass(KeyValueImporter.class);
450       try (Connection conn = ConnectionFactory.createConnection(conf); 
451           Table table = conn.getTable(tableName);
452           RegionLocator regionLocator = conn.getRegionLocator(tableName)){
453         job.setReducerClass(KeyValueSortReducer.class);
454         Path outputDir = new Path(hfileOutPath);
455         FileOutputFormat.setOutputPath(job, outputDir);
456         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
457         job.setMapOutputValueClass(KeyValue.class);
458         HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
459         TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
460             com.google.common.base.Preconditions.class);
461       }
462     } else {
463       // No reducers.  Just write straight to table.  Call initTableReducerJob
464       // because it sets up the TableOutputFormat.
465       job.setMapperClass(Importer.class);
466       TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
467       job.setNumReduceTasks(0);
468     }
469     return job;
470   }
471 
472   /*
473    * @param errorMsg Error message.  Can be null.
474    */
475   private static void usage(final String errorMsg) {
476     if (errorMsg != null && errorMsg.length() > 0) {
477       System.err.println("ERROR: " + errorMsg);
478     }
479     System.err.println("Usage: Import [options] <tablename> <inputdir>");
480     System.err.println("By default Import will load data directly into HBase. To instead generate");
481     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
482     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
483     System.err
484         .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
485     System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
486     System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
487     System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
488         + CF_RENAME_PROP + " property. Futher, filters will only use the"
489         + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
490         + " whether the current row needs to be ignored completely for processing and "
491         + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
492         + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
493         + " the KeyValue.");
494     System.err.println("To import data exported from HBase 0.94, use");
495     System.err.println("  -Dhbase.import.version=0.94");
496     System.err.println("For performance consider the following options:\n"
497         + "  -Dmapreduce.map.speculative=false\n"
498         + "  -Dmapreduce.reduce.speculative=false\n"
499         + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
500             +" Allowed values are the supported durability values"
501             +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
502   }
503 
504   /**
505    * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
506    * need to flush all the regions of the table as the data is held in memory and is also not
507    * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
508    * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
509    */
510   public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
511       InterruptedException {
512     String tableName = conf.get(TABLE_NAME);
513     HBaseAdmin hAdmin = null;
514     String durability = conf.get(WAL_DURABILITY);
515     // Need to flush if the data is written to hbase and skip wal is enabled.
516     if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
517         && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
518       try {
519         hAdmin = new HBaseAdmin(conf);
520         hAdmin.flush(tableName);
521       } finally {
522         if (hAdmin != null) {
523           hAdmin.close();
524         }
525       }
526     }
527   }
528 
529   /**
530    * Main entry point.
531    *
532    * @param args  The command line parameters.
533    * @throws Exception When running the job fails.
534    */
535   public static void main(String[] args) throws Exception {
536     Configuration conf = HBaseConfiguration.create();
537     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
538     if (otherArgs.length < 2) {
539       usage("Wrong number of arguments: " + otherArgs.length);
540       System.exit(-1);
541     }
542     String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
543     if (inputVersionString != null) {
544       conf.set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
545     }
546     Job job = createSubmittableJob(conf, otherArgs);
547     boolean isJobSuccessful = job.waitForCompletion(true);
548     if(isJobSuccessful){
549       // Flush all the regions of the table
550       flushRegionsIfNecessary(conf);
551     }
552     long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
553     long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
554     if (outputRecords < inputRecords) {
555       System.err.println("Warning, not all records were imported (maybe filtered out).");
556       if (outputRecords == 0) {
557         System.err.println("If the data was exported from HBase 0.94 "+
558             "consider using -Dhbase.import.version=0.94.");
559       }
560     }
561 
562     System.exit(isJobSuccessful ? 0 : 1);
563   }
564 }