View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.conf.Configured;
25  import org.apache.hadoop.fs.Path;
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.CellUtil;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.KeyValueUtil;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.classification.InterfaceStability;
35  import org.apache.hadoop.hbase.client.Admin;
36  import org.apache.hadoop.hbase.client.Connection;
37  import org.apache.hadoop.hbase.client.ConnectionFactory;
38  import org.apache.hadoop.hbase.client.Delete;
39  import org.apache.hadoop.hbase.client.Durability;
40  import org.apache.hadoop.hbase.client.HBaseAdmin;
41  import org.apache.hadoop.hbase.client.Mutation;
42  import org.apache.hadoop.hbase.client.Put;
43  import org.apache.hadoop.hbase.client.RegionLocator;
44  import org.apache.hadoop.hbase.client.Result;
45  import org.apache.hadoop.hbase.client.Table;
46  import org.apache.hadoop.hbase.filter.Filter;
47  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
50  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
51  import org.apache.hadoop.mapreduce.Job;
52  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
53  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
54  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
55  import org.apache.hadoop.util.GenericOptionsParser;
56  import org.apache.hadoop.util.Tool;
57  import org.apache.hadoop.util.ToolRunner;
58  import org.apache.zookeeper.KeeperException;
59  
60  import java.io.IOException;
61  import java.lang.reflect.InvocationTargetException;
62  import java.lang.reflect.Method;
63  import java.util.ArrayList;
64  import java.util.Collections;
65  import java.util.List;
66  import java.util.Map;
67  import java.util.TreeMap;
68  import java.util.UUID;
69  
70  
71  /**
72   * Import data written by {@link Export}.
73   */
74  @InterfaceAudience.Public
75  @InterfaceStability.Stable
76  public class Import extends Configured implements Tool {
77    private static final Log LOG = LogFactory.getLog(Import.class);
78    final static String NAME = "import";
79    public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
80    public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
81    public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
82    public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
83    public final static String TABLE_NAME = "import.table.name";
84    public final static String WAL_DURABILITY = "import.wal.durability";
85  
86    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
87  
88    /**
89     * A mapper that just writes out KeyValues.
90     */
91    public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
92      private Map<byte[], byte[]> cfRenameMap;
93      private Filter filter;
94      private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
95  
96      /**
97       * @param row  The current table row key.
98       * @param value  The columns.
99       * @param context  The current context.
100      * @throws IOException When something is broken with the data.
101      */
102     @Override
103     public void map(ImmutableBytesWritable row, Result value,
104       Context context)
105     throws IOException {
106       try {
107         if (LOG.isTraceEnabled()) {
108           LOG.trace("Considering the row."
109               + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
110         }
111         if (filter == null || !filter.filterRowKey(row.get(), row.getOffset(), row.getLength())) {
112           for (Cell kv : value.rawCells()) {
113             kv = filterKv(filter, kv);
114             // skip if we filtered it out
115             if (kv == null) continue;
116             // TODO get rid of ensureKeyValue
117             context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
118           }
119         }
120       } catch (InterruptedException e) {
121         e.printStackTrace();
122       }
123     }
124 
125     @Override
126     public void setup(Context context) {
127       cfRenameMap = createCfRenameMap(context.getConfiguration());
128       filter = instantiateFilter(context.getConfiguration());
129     }
130   }
131 
132   /**
133    * Write table content out to files in hdfs.
134    */
135   public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
136     private Map<byte[], byte[]> cfRenameMap;
137     private List<UUID> clusterIds;
138     private Filter filter;
139     private Durability durability;
140 
141     /**
142      * @param row  The current table row key.
143      * @param value  The columns.
144      * @param context  The current context.
145      * @throws IOException When something is broken with the data.
146      */
147     @Override
148     public void map(ImmutableBytesWritable row, Result value,
149       Context context)
150     throws IOException {
151       try {
152         writeResult(row, value, context);
153       } catch (InterruptedException e) {
154         e.printStackTrace();
155       }
156     }
157 
158     private void writeResult(ImmutableBytesWritable key, Result result, Context context)
159     throws IOException, InterruptedException {
160       Put put = null;
161       Delete delete = null;
162       if (LOG.isTraceEnabled()) {
163         LOG.trace("Considering the row."
164             + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
165       }
166       if (filter == null || !filter.filterRowKey(key.get(), key.getOffset(), key.getLength())) {
167         processKV(key, result, context, put, delete);
168       }
169     }
170 
171     protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
172         Delete delete) throws IOException, InterruptedException {
173       for (Cell kv : result.rawCells()) {
174         kv = filterKv(filter, kv);
175         // skip if we filter it out
176         if (kv == null) continue;
177 
178         kv = convertKv(kv, cfRenameMap);
179         // Deletes and Puts are gathered and written when finished
180         /*
181          * If there are sequence of mutations and tombstones in an Export, and after Import the same
182          * sequence should be restored as it is. If we combine all Delete tombstones into single
183          * request then there is chance of ignoring few DeleteFamily tombstones, because if we
184          * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
185          * only newest in hbase table and ignoring other. Check - HBASE-12065
186          */
187         if (CellUtil.isDeleteFamily(kv)) {
188           Delete deleteFamily = new Delete(key.get());
189           deleteFamily.addDeleteMarker(kv);
190           if (durability != null) {
191             deleteFamily.setDurability(durability);
192           }
193           deleteFamily.setClusterIds(clusterIds);
194           context.write(key, deleteFamily);
195         } else if (CellUtil.isDelete(kv)) {
196           if (delete == null) {
197             delete = new Delete(key.get());
198           }
199           delete.addDeleteMarker(kv);
200         } else {
201           if (put == null) {
202             put = new Put(key.get());
203           }
204           addPutToKv(put, kv);
205         }
206       }
207       if (put != null) {
208         if (durability != null) {
209           put.setDurability(durability);
210         }
211         put.setClusterIds(clusterIds);
212         context.write(key, put);
213       }
214       if (delete != null) {
215         if (durability != null) {
216           delete.setDurability(durability);
217         }
218         delete.setClusterIds(clusterIds);
219         context.write(key, delete);
220       }
221     }
222 
223     protected void addPutToKv(Put put, Cell kv) throws IOException {
224       put.add(kv);
225     }
226 
227     @Override
228     public void setup(Context context) {
229       Configuration conf = context.getConfiguration();
230       cfRenameMap = createCfRenameMap(conf);
231       filter = instantiateFilter(conf);
232       String durabilityStr = conf.get(WAL_DURABILITY);
233       if(durabilityStr != null){
234         durability = Durability.valueOf(durabilityStr.toUpperCase());
235       }
236       // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
237       ZooKeeperWatcher zkw = null;
238       Exception ex = null;
239       try {
240         zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
241         clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
242       } catch (ZooKeeperConnectionException e) {
243         ex = e;
244         LOG.error("Problem connecting to ZooKeper during task setup", e);
245       } catch (KeeperException e) {
246         ex = e;
247         LOG.error("Problem reading ZooKeeper data during task setup", e);
248       } catch (IOException e) {
249         ex = e;
250         LOG.error("Problem setting up task", e);
251       } finally {
252         if (zkw != null) zkw.close();
253       }
254       if (clusterIds == null) {
255         // exit early if setup fails
256         throw new RuntimeException(ex);
257       }
258     }
259   }
260 
261   /**
262    * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
263    * optionally not include in the job output
264    * @param conf {@link Configuration} from which to load the filter
265    * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
266    * @throws IllegalArgumentException if the filter is misconfigured
267    */
268   public static Filter instantiateFilter(Configuration conf) {
269     // get the filter, if it was configured    
270     Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
271     if (filterClass == null) {
272       LOG.debug("No configured filter class, accepting all keyvalues.");
273       return null;
274     }
275     LOG.debug("Attempting to create filter:" + filterClass);
276     String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
277     ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
278     try {
279       Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
280       return (Filter) m.invoke(null, quotedArgs);
281     } catch (IllegalAccessException e) {
282       LOG.error("Couldn't instantiate filter!", e);
283       throw new RuntimeException(e);
284     } catch (SecurityException e) {
285       LOG.error("Couldn't instantiate filter!", e);
286       throw new RuntimeException(e);
287     } catch (NoSuchMethodException e) {
288       LOG.error("Couldn't instantiate filter!", e);
289       throw new RuntimeException(e);
290     } catch (IllegalArgumentException e) {
291       LOG.error("Couldn't instantiate filter!", e);
292       throw new RuntimeException(e);
293     } catch (InvocationTargetException e) {
294       LOG.error("Couldn't instantiate filter!", e);
295       throw new RuntimeException(e);
296     }
297   }
298 
299   private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
300     ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
301     for (String stringArg : stringArgs) {
302       // all the filters' instantiation methods expected quoted args since they are coming from
303       // the shell, so add them here, though it shouldn't really be needed :-/
304       quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
305     }
306     return quotedArgs;
307   }
308 
309   /**
310    * Attempt to filter out the keyvalue
311    * @param kv {@link KeyValue} on which to apply the filter
312    * @return <tt>null</tt> if the key should not be written, otherwise returns the original
313    *         {@link KeyValue}
314    */
315   public static Cell filterKv(Filter filter, Cell kv) throws IOException {
316     // apply the filter and skip this kv if the filter doesn't apply
317     if (filter != null) {
318       Filter.ReturnCode code = filter.filterKeyValue(kv);
319       if (LOG.isTraceEnabled()) {
320         LOG.trace("Filter returned:" + code + " for the key value:" + kv);
321       }
322       // if its not an accept type, then skip this kv
323       if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
324           .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
325         return null;
326       }
327     }
328     return kv;
329   }
330 
331   // helper: create a new KeyValue based on CF rename map
332   private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
333     if(cfRenameMap != null) {
334       // If there's a rename mapping for this CF, create a new KeyValue
335       byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
336       if(newCfName != null) {
337           kv = new KeyValue(kv.getRowArray(), // row buffer 
338                   kv.getRowOffset(),        // row offset
339                   kv.getRowLength(),        // row length
340                   newCfName,                // CF buffer
341                   0,                        // CF offset 
342                   newCfName.length,         // CF length 
343                   kv.getQualifierArray(),   // qualifier buffer
344                   kv.getQualifierOffset(),  // qualifier offset
345                   kv.getQualifierLength(),  // qualifier length
346                   kv.getTimestamp(),        // timestamp
347                   KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
348                   kv.getValueArray(),       // value buffer 
349                   kv.getValueOffset(),      // value offset
350                   kv.getValueLength());     // value length
351       }
352     }
353     return kv;
354   }
355 
356   // helper: make a map from sourceCfName to destCfName by parsing a config key
357   private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
358     Map<byte[], byte[]> cfRenameMap = null;
359     String allMappingsPropVal = conf.get(CF_RENAME_PROP);
360     if(allMappingsPropVal != null) {
361       // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
362       String[] allMappings = allMappingsPropVal.split(",");
363       for (String mapping: allMappings) {
364         if(cfRenameMap == null) {
365             cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
366         }
367         String [] srcAndDest = mapping.split(":");
368         if(srcAndDest.length != 2) {
369             continue;
370         }
371         cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
372       }
373     }
374     return cfRenameMap;
375   }
376 
377   /**
378    * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
379    * the mapper how to rename column families.
380    * 
381    * <p>Alternately, instead of calling this function, you could set the configuration key 
382    * {@link #CF_RENAME_PROP} yourself. The value should look like 
383    * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
384    * the mapper behavior.
385    * 
386    * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
387    *  set
388    * @param renameMap a mapping from source CF names to destination CF names
389    */
390   static public void configureCfRenaming(Configuration conf, 
391           Map<String, String> renameMap) {
392     StringBuilder sb = new StringBuilder();
393     for(Map.Entry<String,String> entry: renameMap.entrySet()) {
394       String sourceCf = entry.getKey();
395       String destCf = entry.getValue();
396 
397       if(sourceCf.contains(":") || sourceCf.contains(",") || 
398               destCf.contains(":") || destCf.contains(",")) {
399         throw new IllegalArgumentException("Illegal character in CF names: " 
400               + sourceCf + ", " + destCf);
401       }
402 
403       if(sb.length() != 0) {
404         sb.append(",");
405       }
406       sb.append(sourceCf + ":" + destCf);
407     }
408     conf.set(CF_RENAME_PROP, sb.toString());
409   }
410 
411   /**
412    * Add a Filter to be instantiated on import
413    * @param conf Configuration to update (will be passed to the job)
414    * @param clazz {@link Filter} subclass to instantiate on the server.
415    * @param filterArgs List of arguments to pass to the filter on instantiation
416    */
417   public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
418       List<String> filterArgs) throws IOException {
419     conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
420     conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
421   }
422 
423   /**
424    * Sets up the actual job.
425    * @param conf The current configuration.
426    * @param args The command line parameters.
427    * @return The newly created job.
428    * @throws IOException When setting up the job fails.
429    */
430   public static Job createSubmittableJob(Configuration conf, String[] args)
431   throws IOException {
432     TableName tableName = TableName.valueOf(args[0]);
433     conf.set(TABLE_NAME, tableName.getNameAsString());
434     Path inputDir = new Path(args[1]);
435     Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
436     job.setJarByClass(Importer.class);
437     FileInputFormat.setInputPaths(job, inputDir);
438     job.setInputFormatClass(SequenceFileInputFormat.class);
439     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
440 
441     // make sure we get the filter in the jars
442     try {
443       Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
444       if (filter != null) {
445         TableMapReduceUtil.addDependencyJars(conf, filter);
446       }
447     } catch (Exception e) {
448       throw new IOException(e);
449     }
450 
451     if (hfileOutPath != null) {
452       job.setMapperClass(KeyValueImporter.class);
453       try (Connection conn = ConnectionFactory.createConnection(conf); 
454           Table table = conn.getTable(tableName);
455           RegionLocator regionLocator = conn.getRegionLocator(tableName)){
456         job.setReducerClass(KeyValueSortReducer.class);
457         Path outputDir = new Path(hfileOutPath);
458         FileOutputFormat.setOutputPath(job, outputDir);
459         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
460         job.setMapOutputValueClass(KeyValue.class);
461         HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
462         TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
463             com.google.common.base.Preconditions.class);
464       }
465     } else {
466       // No reducers.  Just write straight to table.  Call initTableReducerJob
467       // because it sets up the TableOutputFormat.
468       job.setMapperClass(Importer.class);
469       TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
470       job.setNumReduceTasks(0);
471     }
472     return job;
473   }
474 
475   /*
476    * @param errorMsg Error message.  Can be null.
477    */
478   private static void usage(final String errorMsg) {
479     if (errorMsg != null && errorMsg.length() > 0) {
480       System.err.println("ERROR: " + errorMsg);
481     }
482     System.err.println("Usage: Import [options] <tablename> <inputdir>");
483     System.err.println("By default Import will load data directly into HBase. To instead generate");
484     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
485     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
486     System.err
487         .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
488     System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
489     System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
490     System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
491         + CF_RENAME_PROP + " property. Futher, filters will only use the"
492         + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
493         + " whether the current row needs to be ignored completely for processing and "
494         + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
495         + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
496         + " the KeyValue.");
497     System.err.println("   -D " + JOB_NAME_CONF_KEY
498         + "=jobName - use the specified mapreduce job name for the import");
499     System.err.println("For performance consider the following options:\n"
500         + "  -Dmapreduce.map.speculative=false\n"
501         + "  -Dmapreduce.reduce.speculative=false\n"
502         + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
503             +" Allowed values are the supported durability values"
504             +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
505   }
506 
507   /**
508    * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
509    * need to flush all the regions of the table as the data is held in memory and is also not
510    * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
511    * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
512    */
513   public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
514       InterruptedException {
515     String tableName = conf.get(TABLE_NAME);
516     Admin hAdmin = null;
517     Connection connection = null;
518     String durability = conf.get(WAL_DURABILITY);
519     // Need to flush if the data is written to hbase and skip wal is enabled.
520     if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
521         && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
522       try {
523         connection = ConnectionFactory.createConnection(conf);
524         hAdmin = connection.getAdmin();
525         hAdmin.flush(TableName.valueOf(tableName));
526       } finally {
527         if (hAdmin != null) {
528           hAdmin.close();
529         }
530         if (connection != null) {
531           connection.close();
532         }
533       }
534     }
535   }
536 
537   @Override
538   public int run(String[] args) throws Exception {
539     String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
540     if (otherArgs.length < 2) {
541       usage("Wrong number of arguments: " + otherArgs.length);
542       return -1;
543     }
544     String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
545     if (inputVersionString != null) {
546       getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
547     }
548     Job job = createSubmittableJob(getConf(), otherArgs);
549     boolean isJobSuccessful = job.waitForCompletion(true);
550     if(isJobSuccessful){
551       // Flush all the regions of the table
552       flushRegionsIfNecessary(getConf());
553     }
554     return (isJobSuccessful ? 0 : 1);
555   }
556 
557   /**
558    * Main entry point.
559    * @param args The command line parameters.
560    * @throws Exception When running the job fails.
561    */
562   public static void main(String[] args) throws Exception {
563     int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
564     System.exit(errCode);
565   }
566 
567 }