View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.lang.reflect.InvocationTargetException;
27  import java.lang.reflect.Method;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.TreeMap;
33  import java.util.UUID;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.conf.Configured;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.Cell;
42  import org.apache.hadoop.hbase.CellComparator;
43  import org.apache.hadoop.hbase.CellUtil;
44  import org.apache.hadoop.hbase.HBaseConfiguration;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.KeyValueUtil;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
49  import org.apache.hadoop.hbase.classification.InterfaceAudience;
50  import org.apache.hadoop.hbase.classification.InterfaceStability;
51  import org.apache.hadoop.hbase.client.Admin;
52  import org.apache.hadoop.hbase.client.Connection;
53  import org.apache.hadoop.hbase.client.ConnectionFactory;
54  import org.apache.hadoop.hbase.client.Delete;
55  import org.apache.hadoop.hbase.client.Durability;
56  import org.apache.hadoop.hbase.client.Mutation;
57  import org.apache.hadoop.hbase.client.Put;
58  import org.apache.hadoop.hbase.client.RegionLocator;
59  import org.apache.hadoop.hbase.client.Result;
60  import org.apache.hadoop.hbase.client.Table;
61  import org.apache.hadoop.hbase.filter.Filter;
62  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
63  import org.apache.hadoop.hbase.util.Bytes;
64  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
65  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
66  import org.apache.hadoop.io.RawComparator;
67  import org.apache.hadoop.io.WritableComparable;
68  import org.apache.hadoop.io.WritableComparator;
69  import org.apache.hadoop.mapreduce.Job;
70  import org.apache.hadoop.mapreduce.Partitioner;
71  import org.apache.hadoop.mapreduce.Reducer;
72  import org.apache.hadoop.mapreduce.TaskCounter;
73  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
74  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
75  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
76  import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
77  import org.apache.hadoop.util.Tool;
78  import org.apache.hadoop.util.ToolRunner;
79  import org.apache.zookeeper.KeeperException;
80  
81  
82  /**
83   * Import data written by {@link Export}.
84   */
85  @InterfaceAudience.Public
86  @InterfaceStability.Stable
87  public class Import extends Configured implements Tool {
88    private static final Log LOG = LogFactory.getLog(Import.class);
89    final static String NAME = "import";
90    public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
91    public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
92    public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
93    public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
94    public final static String TABLE_NAME = "import.table.name";
95    public final static String WAL_DURABILITY = "import.wal.durability";
96    public final static String HAS_LARGE_RESULT= "import.bulk.hasLargeResult";
97  
98    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
99  
100   public static class KeyValueWritableComparablePartitioner 
101       extends Partitioner<KeyValueWritableComparable, KeyValue> {
102     private static KeyValueWritableComparable[] START_KEYS = null;
103     @Override
104     public int getPartition(KeyValueWritableComparable key, KeyValue value,
105         int numPartitions) {
106       for (int i = 0; i < START_KEYS.length; ++i) {
107         if (key.compareTo(START_KEYS[i]) <= 0) {
108           return i;
109         }
110       }
111       return START_KEYS.length;
112     }
113     
114   }
115   
116   public static class KeyValueWritableComparable 
117       implements WritableComparable<KeyValueWritableComparable> {
118 
119     private KeyValue kv = null;
120     
121     static {                                       
122       // register this comparator
123       WritableComparator.define(KeyValueWritableComparable.class, 
124           new KeyValueWritableComparator());
125     }
126     
127     public KeyValueWritableComparable() {
128     }
129     
130     public KeyValueWritableComparable(KeyValue kv) {
131       this.kv = kv;
132     }
133     
134     @Override
135     public void write(DataOutput out) throws IOException {
136       KeyValue.write(kv, out);
137     }
138 
139     @Override
140     public void readFields(DataInput in) throws IOException {
141       kv = KeyValue.create(in);
142     }
143 
144     @Override
145     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
146       justification="This is wrong, yes, but we should be purging Writables, not fixing them")
147     public int compareTo(KeyValueWritableComparable o) {
148       return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv);
149     }
150     
151     public static class KeyValueWritableComparator extends WritableComparator {
152 
153       @Override
154       public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
155         try {
156           KeyValueWritableComparable kv1 = new KeyValueWritableComparable();
157           kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
158           KeyValueWritableComparable kv2 = new KeyValueWritableComparable();
159           kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
160           return compare(kv1, kv2);
161         } catch (IOException e) {
162           throw new RuntimeException(e);
163         } 
164       }
165       
166     }
167     
168   }
169   
170   public static class KeyValueReducer
171       extends
172       Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
173     protected void reduce(
174         KeyValueWritableComparable row,
175         Iterable<KeyValue> kvs,
176         Reducer<KeyValueWritableComparable,
177           KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
178         throws java.io.IOException, InterruptedException {
179       int index = 0;
180       for (KeyValue kv : kvs) {
181         context.write(new ImmutableBytesWritable(kv.getRowArray()), kv);
182         if (++index % 100 == 0)
183           context.setStatus("Wrote " + index + " KeyValues, "
184               + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); 
185       }
186     }
187   }
188   
189   public static class KeyValueSortImporter 
190       extends TableMapper<KeyValueWritableComparable, KeyValue> {
191     private Map<byte[], byte[]> cfRenameMap;
192     private Filter filter;
193     private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
194 
195     /**
196      * @param row  The current table row key.
197      * @param value  The columns.
198      * @param context  The current context.
199      * @throws IOException When something is broken with the data.
200      */
201     @Override
202     public void map(ImmutableBytesWritable row, Result value,
203       Context context)
204     throws IOException {
205       try {
206         if (LOG.isTraceEnabled()) {
207           LOG.trace("Considering the row."
208               + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
209         }
210         if (filter == null
211             || !filter.filterRowKey(KeyValueUtil.createFirstOnRow(row.get(), row.getOffset(),
212                 (short) row.getLength()))) {
213           for (Cell kv : value.rawCells()) {
214             kv = filterKv(filter, kv);
215             // skip if we filtered it out
216             if (kv == null) continue;
217             // TODO get rid of ensureKeyValue
218             KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
219             context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); 
220           }
221         }
222       } catch (InterruptedException e) {
223         e.printStackTrace();
224       }
225     }
226     
227     @Override
228     public void setup(Context context) throws IOException { 
229       cfRenameMap = createCfRenameMap(context.getConfiguration());
230       filter = instantiateFilter(context.getConfiguration());
231       int reduceNum = context.getNumReduceTasks();
232       Configuration conf = context.getConfiguration();
233       TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
234       try (Connection conn = ConnectionFactory.createConnection(conf); 
235           RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
236         byte[][] startKeys = regionLocator.getStartKeys();
237         if (startKeys.length != reduceNum) {
238           throw new IOException("Region split after job initialization");
239         }
240         KeyValueWritableComparable[] startKeyWraps = 
241             new KeyValueWritableComparable[startKeys.length - 1];
242         for (int i = 1; i < startKeys.length; ++i) {
243           startKeyWraps[i - 1] = 
244               new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
245         }
246         KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps;
247       }
248     }
249   }
250   
251   /**
252    * A mapper that just writes out KeyValues.
253    */
254   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
255       justification="Writables are going away and this has been this way forever")
256   public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
257     private Map<byte[], byte[]> cfRenameMap;
258     private Filter filter;
259     private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
260 
261     /**
262      * @param row  The current table row key.
263      * @param value  The columns.
264      * @param context  The current context.
265      * @throws IOException When something is broken with the data.
266      */
267     @Override
268     public void map(ImmutableBytesWritable row, Result value,
269       Context context)
270     throws IOException {
271       try {
272         if (LOG.isTraceEnabled()) {
273           LOG.trace("Considering the row."
274               + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
275         }
276         if (filter == null
277             || !filter.filterRowKey(KeyValueUtil.createFirstOnRow(row.get(), row.getOffset(),
278                 (short) row.getLength()))) {
279           for (Cell kv : value.rawCells()) {
280             kv = filterKv(filter, kv);
281             // skip if we filtered it out
282             if (kv == null) continue;
283             // TODO get rid of ensureKeyValue
284             context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
285           }
286         }
287       } catch (InterruptedException e) {
288         e.printStackTrace();
289       }
290     }
291 
292     @Override
293     public void setup(Context context) {
294       cfRenameMap = createCfRenameMap(context.getConfiguration());
295       filter = instantiateFilter(context.getConfiguration());
296     }
297   }
298 
299   /**
300    * Write table content out to files in hdfs.
301    */
302   public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
303     private Map<byte[], byte[]> cfRenameMap;
304     private List<UUID> clusterIds;
305     private Filter filter;
306     private Durability durability;
307 
308     /**
309      * @param row  The current table row key.
310      * @param value  The columns.
311      * @param context  The current context.
312      * @throws IOException When something is broken with the data.
313      */
314     @Override
315     public void map(ImmutableBytesWritable row, Result value,
316       Context context)
317     throws IOException {
318       try {
319         writeResult(row, value, context);
320       } catch (InterruptedException e) {
321         e.printStackTrace();
322       }
323     }
324 
325     private void writeResult(ImmutableBytesWritable key, Result result, Context context)
326     throws IOException, InterruptedException {
327       Put put = null;
328       Delete delete = null;
329       if (LOG.isTraceEnabled()) {
330         LOG.trace("Considering the row."
331             + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
332       }
333       if (filter == null
334           || !filter.filterRowKey(KeyValueUtil.createFirstOnRow(key.get(), key.getOffset(),
335               (short) key.getLength()))) {
336         processKV(key, result, context, put, delete);
337       }
338     }
339 
340     protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
341         Delete delete) throws IOException, InterruptedException {
342       for (Cell kv : result.rawCells()) {
343         kv = filterKv(filter, kv);
344         // skip if we filter it out
345         if (kv == null) continue;
346 
347         kv = convertKv(kv, cfRenameMap);
348         // Deletes and Puts are gathered and written when finished
349         /*
350          * If there are sequence of mutations and tombstones in an Export, and after Import the same
351          * sequence should be restored as it is. If we combine all Delete tombstones into single
352          * request then there is chance of ignoring few DeleteFamily tombstones, because if we
353          * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
354          * only newest in hbase table and ignoring other. Check - HBASE-12065
355          */
356         if (CellUtil.isDeleteFamily(kv)) {
357           Delete deleteFamily = new Delete(key.get());
358           deleteFamily.addDeleteMarker(kv);
359           if (durability != null) {
360             deleteFamily.setDurability(durability);
361           }
362           deleteFamily.setClusterIds(clusterIds);
363           context.write(key, deleteFamily);
364         } else if (CellUtil.isDelete(kv)) {
365           if (delete == null) {
366             delete = new Delete(key.get());
367           }
368           delete.addDeleteMarker(kv);
369         } else {
370           if (put == null) {
371             put = new Put(key.get());
372           }
373           addPutToKv(put, kv);
374         }
375       }
376       if (put != null) {
377         if (durability != null) {
378           put.setDurability(durability);
379         }
380         put.setClusterIds(clusterIds);
381         context.write(key, put);
382       }
383       if (delete != null) {
384         if (durability != null) {
385           delete.setDurability(durability);
386         }
387         delete.setClusterIds(clusterIds);
388         context.write(key, delete);
389       }
390     }
391 
392     protected void addPutToKv(Put put, Cell kv) throws IOException {
393       put.add(kv);
394     }
395 
396     @Override
397     public void setup(Context context) {
398       LOG.info("Setting up " + getClass() + " mapper.");
399       Configuration conf = context.getConfiguration();
400       cfRenameMap = createCfRenameMap(conf);
401       filter = instantiateFilter(conf);
402       String durabilityStr = conf.get(WAL_DURABILITY);
403       if(durabilityStr != null){
404         durability = Durability.valueOf(durabilityStr.toUpperCase());
405         LOG.info("setting WAL durability to " + durability);
406       } else {
407         LOG.info("setting WAL durability to default.");
408       }
409       // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
410       ZooKeeperWatcher zkw = null;
411       Exception ex = null;
412       try {
413         zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
414         clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
415       } catch (ZooKeeperConnectionException e) {
416         ex = e;
417         LOG.error("Problem connecting to ZooKeper during task setup", e);
418       } catch (KeeperException e) {
419         ex = e;
420         LOG.error("Problem reading ZooKeeper data during task setup", e);
421       } catch (IOException e) {
422         ex = e;
423         LOG.error("Problem setting up task", e);
424       } finally {
425         if (zkw != null) zkw.close();
426       }
427       if (clusterIds == null) {
428         // exit early if setup fails
429         throw new RuntimeException(ex);
430       }
431     }
432   }
433 
434   /**
435    * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
436    * optionally not include in the job output
437    * @param conf {@link Configuration} from which to load the filter
438    * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
439    * @throws IllegalArgumentException if the filter is misconfigured
440    */
441   public static Filter instantiateFilter(Configuration conf) {
442     // get the filter, if it was configured    
443     Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
444     if (filterClass == null) {
445       LOG.debug("No configured filter class, accepting all keyvalues.");
446       return null;
447     }
448     LOG.debug("Attempting to create filter:" + filterClass);
449     String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
450     ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
451     try {
452       Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
453       return (Filter) m.invoke(null, quotedArgs);
454     } catch (IllegalAccessException e) {
455       LOG.error("Couldn't instantiate filter!", e);
456       throw new RuntimeException(e);
457     } catch (SecurityException e) {
458       LOG.error("Couldn't instantiate filter!", e);
459       throw new RuntimeException(e);
460     } catch (NoSuchMethodException e) {
461       LOG.error("Couldn't instantiate filter!", e);
462       throw new RuntimeException(e);
463     } catch (IllegalArgumentException e) {
464       LOG.error("Couldn't instantiate filter!", e);
465       throw new RuntimeException(e);
466     } catch (InvocationTargetException e) {
467       LOG.error("Couldn't instantiate filter!", e);
468       throw new RuntimeException(e);
469     }
470   }
471 
472   private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
473     ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
474     for (String stringArg : stringArgs) {
475       // all the filters' instantiation methods expected quoted args since they are coming from
476       // the shell, so add them here, though it shouldn't really be needed :-/
477       quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
478     }
479     return quotedArgs;
480   }
481 
482   /**
483    * Attempt to filter out the keyvalue
484    * @param kv {@link KeyValue} on which to apply the filter
485    * @return <tt>null</tt> if the key should not be written, otherwise returns the original
486    *         {@link KeyValue}
487    */
488   public static Cell filterKv(Filter filter, Cell kv) throws IOException {
489     // apply the filter and skip this kv if the filter doesn't apply
490     if (filter != null) {
491       Filter.ReturnCode code = filter.filterKeyValue(kv);
492       if (LOG.isTraceEnabled()) {
493         LOG.trace("Filter returned:" + code + " for the key value:" + kv);
494       }
495       // if its not an accept type, then skip this kv
496       if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
497           .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
498         return null;
499       }
500     }
501     return kv;
502   }
503 
504   // helper: create a new KeyValue based on CF rename map
505   private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
506     if(cfRenameMap != null) {
507       // If there's a rename mapping for this CF, create a new KeyValue
508       byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
509       if(newCfName != null) {
510           kv = new KeyValue(kv.getRowArray(), // row buffer 
511                   kv.getRowOffset(),        // row offset
512                   kv.getRowLength(),        // row length
513                   newCfName,                // CF buffer
514                   0,                        // CF offset 
515                   newCfName.length,         // CF length 
516                   kv.getQualifierArray(),   // qualifier buffer
517                   kv.getQualifierOffset(),  // qualifier offset
518                   kv.getQualifierLength(),  // qualifier length
519                   kv.getTimestamp(),        // timestamp
520                   KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
521                   kv.getValueArray(),       // value buffer 
522                   kv.getValueOffset(),      // value offset
523                   kv.getValueLength());     // value length
524       }
525     }
526     return kv;
527   }
528 
529   // helper: make a map from sourceCfName to destCfName by parsing a config key
530   private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
531     Map<byte[], byte[]> cfRenameMap = null;
532     String allMappingsPropVal = conf.get(CF_RENAME_PROP);
533     if(allMappingsPropVal != null) {
534       // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
535       String[] allMappings = allMappingsPropVal.split(",");
536       for (String mapping: allMappings) {
537         if(cfRenameMap == null) {
538             cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
539         }
540         String [] srcAndDest = mapping.split(":");
541         if(srcAndDest.length != 2) {
542             continue;
543         }
544         cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
545       }
546     }
547     return cfRenameMap;
548   }
549 
550   /**
551    * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
552    * the mapper how to rename column families.
553    * 
554    * <p>Alternately, instead of calling this function, you could set the configuration key 
555    * {@link #CF_RENAME_PROP} yourself. The value should look like 
556    * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
557    * the mapper behavior.
558    * 
559    * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
560    *  set
561    * @param renameMap a mapping from source CF names to destination CF names
562    */
563   static public void configureCfRenaming(Configuration conf, 
564           Map<String, String> renameMap) {
565     StringBuilder sb = new StringBuilder();
566     for(Map.Entry<String,String> entry: renameMap.entrySet()) {
567       String sourceCf = entry.getKey();
568       String destCf = entry.getValue();
569 
570       if(sourceCf.contains(":") || sourceCf.contains(",") || 
571               destCf.contains(":") || destCf.contains(",")) {
572         throw new IllegalArgumentException("Illegal character in CF names: " 
573               + sourceCf + ", " + destCf);
574       }
575 
576       if(sb.length() != 0) {
577         sb.append(",");
578       }
579       sb.append(sourceCf + ":" + destCf);
580     }
581     conf.set(CF_RENAME_PROP, sb.toString());
582   }
583 
584   /**
585    * Add a Filter to be instantiated on import
586    * @param conf Configuration to update (will be passed to the job)
587    * @param clazz {@link Filter} subclass to instantiate on the server.
588    * @param filterArgs List of arguments to pass to the filter on instantiation
589    */
590   public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
591       List<String> filterArgs) throws IOException {
592     conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
593     conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
594   }
595 
596   /**
597    * Sets up the actual job.
598    * @param conf The current configuration.
599    * @param args The command line parameters.
600    * @return The newly created job.
601    * @throws IOException When setting up the job fails.
602    */
603   public static Job createSubmittableJob(Configuration conf, String[] args)
604   throws IOException {
605     TableName tableName = TableName.valueOf(args[0]);
606     conf.set(TABLE_NAME, tableName.getNameAsString());
607     Path inputDir = new Path(args[1]);
608     Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
609     job.setJarByClass(Importer.class);
610     FileInputFormat.setInputPaths(job, inputDir);
611     job.setInputFormatClass(SequenceFileInputFormat.class);
612     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
613 
614     // make sure we get the filter in the jars
615     try {
616       Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
617       if (filter != null) {
618         TableMapReduceUtil.addDependencyJars(conf, filter);
619       }
620     } catch (Exception e) {
621       throw new IOException(e);
622     }
623 
624     if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
625       LOG.info("Use Large Result!!");
626       try (Connection conn = ConnectionFactory.createConnection(conf); 
627           Table table = conn.getTable(tableName);
628           RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
629         HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
630         job.setMapperClass(KeyValueSortImporter.class);
631         job.setReducerClass(KeyValueReducer.class);
632         Path outputDir = new Path(hfileOutPath);
633         FileOutputFormat.setOutputPath(job, outputDir);
634         job.setMapOutputKeyClass(KeyValueWritableComparable.class);
635         job.setMapOutputValueClass(KeyValue.class);
636         job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", 
637             KeyValueWritableComparable.KeyValueWritableComparator.class,
638             RawComparator.class);
639         Path partitionsPath = 
640             new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
641         FileSystem fs = FileSystem.get(job.getConfiguration());
642         fs.deleteOnExit(partitionsPath);
643         job.setPartitionerClass(KeyValueWritableComparablePartitioner.class);
644         job.setNumReduceTasks(regionLocator.getStartKeys().length);
645         TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
646             com.google.common.base.Preconditions.class);
647       }
648     } else if (hfileOutPath != null) {
649       LOG.info("writing to hfiles for bulk load.");
650       job.setMapperClass(KeyValueImporter.class);
651       try (Connection conn = ConnectionFactory.createConnection(conf); 
652           Table table = conn.getTable(tableName);
653           RegionLocator regionLocator = conn.getRegionLocator(tableName)){
654         job.setReducerClass(KeyValueSortReducer.class);
655         Path outputDir = new Path(hfileOutPath);
656         FileOutputFormat.setOutputPath(job, outputDir);
657         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
658         job.setMapOutputValueClass(KeyValue.class);
659         HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
660         TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
661             com.google.common.base.Preconditions.class);
662       }
663     } else {
664       LOG.info("writing directly to table from Mapper.");
665       // No reducers.  Just write straight to table.  Call initTableReducerJob
666       // because it sets up the TableOutputFormat.
667       job.setMapperClass(Importer.class);
668       TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
669       job.setNumReduceTasks(0);
670     }
671     return job;
672   }
673 
674   /*
675    * @param errorMsg Error message.  Can be null.
676    */
677   private static void usage(final String errorMsg) {
678     if (errorMsg != null && errorMsg.length() > 0) {
679       System.err.println("ERROR: " + errorMsg);
680     }
681     System.err.println("Usage: Import [options] <tablename> <inputdir>");
682     System.err.println("By default Import will load data directly into HBase. To instead generate");
683     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
684     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
685     System.err.println("If there is a large result that includes too much KeyValue "
686         + "whitch can occur OOME caused by the memery sort in reducer, pass the option:");
687     System.err.println("  -D" + HAS_LARGE_RESULT + "=true");
688     System.err
689         .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
690     System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
691     System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
692     System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
693         + CF_RENAME_PROP + " property. Futher, filters will only use the"
694         + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
695         + " whether the current row needs to be ignored completely for processing and "
696         + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
697         + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
698         + " the KeyValue.");
699     System.err.println("To import data exported from HBase 0.94, use");
700     System.err.println("  -Dhbase.import.version=0.94");
701     System.err.println("  -D " + JOB_NAME_CONF_KEY
702         + "=jobName - use the specified mapreduce job name for the import");
703     System.err.println("For performance consider the following options:\n"
704         + "  -Dmapreduce.map.speculative=false\n"
705         + "  -Dmapreduce.reduce.speculative=false\n"
706         + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
707             +" Allowed values are the supported durability values"
708             +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
709   }
710 
711   /**
712    * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
713    * need to flush all the regions of the table as the data is held in memory and is also not
714    * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
715    * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
716    */
717   public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
718       InterruptedException {
719     String tableName = conf.get(TABLE_NAME);
720     Admin hAdmin = null;
721     Connection connection = null;
722     String durability = conf.get(WAL_DURABILITY);
723     // Need to flush if the data is written to hbase and skip wal is enabled.
724     if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
725         && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
726       LOG.info("Flushing all data that skipped the WAL.");
727       try {
728         connection = ConnectionFactory.createConnection(conf);
729         hAdmin = connection.getAdmin();
730         hAdmin.flush(TableName.valueOf(tableName));
731       } finally {
732         if (hAdmin != null) {
733           hAdmin.close();
734         }
735         if (connection != null) {
736           connection.close();
737         }
738       }
739     }
740   }
741 
742   @Override
743   public int run(String[] args) throws Exception {
744     if (args.length < 2) {
745       usage("Wrong number of arguments: " + args.length);
746       return -1;
747     }
748     String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
749     if (inputVersionString != null) {
750       getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
751     }
752     Job job = createSubmittableJob(getConf(), args);
753     boolean isJobSuccessful = job.waitForCompletion(true);
754     if(isJobSuccessful){
755       // Flush all the regions of the table
756       flushRegionsIfNecessary(getConf());
757     }
758     long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
759     long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
760     if (outputRecords < inputRecords) {
761       System.err.println("Warning, not all records were imported (maybe filtered out).");
762       if (outputRecords == 0) {
763         System.err.println("If the data was exported from HBase 0.94 "+
764             "consider using -Dhbase.import.version=0.94.");
765       }
766     }
767 
768     return (isJobSuccessful ? 0 : 1);
769   }
770 
771   /**
772    * Main entry point.
773    * @param args The command line parameters.
774    * @throws Exception When running the job fails.
775    */
776   public static void main(String[] args) throws Exception {
777     int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
778     System.exit(errCode);
779   }
780 
781 }