View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.lang.reflect.InvocationTargetException;
27  import java.lang.reflect.Method;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.Locale;
32  import java.util.Map;
33  import java.util.TreeMap;
34  import java.util.UUID;
35
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.conf.Configured;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.CellComparator;
44  import org.apache.hadoop.hbase.CellUtil;
45  import org.apache.hadoop.hbase.HBaseConfiguration;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.KeyValueUtil;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
50  import org.apache.hadoop.hbase.classification.InterfaceAudience;
51  import org.apache.hadoop.hbase.classification.InterfaceStability;
52  import org.apache.hadoop.hbase.client.Admin;
53  import org.apache.hadoop.hbase.client.Connection;
54  import org.apache.hadoop.hbase.client.ConnectionFactory;
55  import org.apache.hadoop.hbase.client.Delete;
56  import org.apache.hadoop.hbase.client.Durability;
57  import org.apache.hadoop.hbase.client.Mutation;
58  import org.apache.hadoop.hbase.client.Put;
59  import org.apache.hadoop.hbase.client.RegionLocator;
60  import org.apache.hadoop.hbase.client.Result;
61  import org.apache.hadoop.hbase.client.Table;
62  import org.apache.hadoop.hbase.filter.Filter;
63  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
64  import org.apache.hadoop.hbase.util.Bytes;
65  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
66  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
67  import org.apache.hadoop.io.RawComparator;
68  import org.apache.hadoop.io.WritableComparable;
69  import org.apache.hadoop.io.WritableComparator;
70  import org.apache.hadoop.mapreduce.Job;
71  import org.apache.hadoop.mapreduce.Partitioner;
72  import org.apache.hadoop.mapreduce.Reducer;
73  import org.apache.hadoop.mapreduce.TaskCounter;
74  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
75  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
76  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
77  import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
78  import org.apache.hadoop.util.Tool;
79  import org.apache.hadoop.util.ToolRunner;
80  import org.apache.zookeeper.KeeperException;
81  
82
83  /**
84   * Import data written by {@link Export}.
85   */
86  @InterfaceAudience.Public
87  @InterfaceStability.Stable
88  public class Import extends Configured implements Tool {
89    private static final Log LOG = LogFactory.getLog(Import.class);
90    final static String NAME = "import";
91    public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
92    public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
93    public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
94    public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
95    public final static String TABLE_NAME = "import.table.name";
96    public final static String WAL_DURABILITY = "import.wal.durability";
97    public final static String HAS_LARGE_RESULT= "import.bulk.hasLargeResult";
98
99    private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
100
101   public static class KeyValueWritableComparablePartitioner
102       extends Partitioner<KeyValueWritableComparable, KeyValue> {
103     private static KeyValueWritableComparable[] START_KEYS = null;
104     @Override
105     public int getPartition(KeyValueWritableComparable key, KeyValue value,
106         int numPartitions) {
107       for (int i = 0; i < START_KEYS.length; ++i) {
108         if (key.compareTo(START_KEYS[i]) <= 0) {
109           return i;
110         }
111       }
112       return START_KEYS.length;
113     }
114
115   }
116
117   public static class KeyValueWritableComparable
118       implements WritableComparable<KeyValueWritableComparable> {
119
120     private KeyValue kv = null;
121
122     static {
123       // register this comparator
124       WritableComparator.define(KeyValueWritableComparable.class,
125           new KeyValueWritableComparator());
126     }
127
128     public KeyValueWritableComparable() {
129     }
130
131     public KeyValueWritableComparable(KeyValue kv) {
132       this.kv = kv;
133     }
134
135     @Override
136     public void write(DataOutput out) throws IOException {
137       KeyValue.write(kv, out);
138     }
139
140     @Override
141     public void readFields(DataInput in) throws IOException {
142       kv = KeyValue.create(in);
143     }
144
145     @Override
146     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
147       justification="This is wrong, yes, but we should be purging Writables, not fixing them")
148     public int compareTo(KeyValueWritableComparable o) {
149       return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv);
150     }
151
152     public static class KeyValueWritableComparator extends WritableComparator {
153
154       @Override
155       public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
156         try {
157           KeyValueWritableComparable kv1 = new KeyValueWritableComparable();
158           kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
159           KeyValueWritableComparable kv2 = new KeyValueWritableComparable();
160           kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
161           return compare(kv1, kv2);
162         } catch (IOException e) {
163           throw new RuntimeException(e);
164         }
165       }
166
167     }
168
169   }
170
171   public static class KeyValueReducer
172       extends
173       Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
174     protected void reduce(
175         KeyValueWritableComparable row,
176         Iterable<KeyValue> kvs,
177         Reducer<KeyValueWritableComparable,
178           KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
179         throws java.io.IOException, InterruptedException {
180       int index = 0;
181       for (KeyValue kv : kvs) {
182         context.write(new ImmutableBytesWritable(kv.getRowArray()), kv);
183         if (++index % 100 == 0)
184           context.setStatus("Wrote " + index + " KeyValues, "
185               + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
186       }
187     }
188   }
189
190   public static class KeyValueSortImporter
191       extends TableMapper<KeyValueWritableComparable, KeyValue> {
192     private Map<byte[], byte[]> cfRenameMap;
193     private Filter filter;
194     private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
195
196     /**
197      * @param row  The current table row key.
198      * @param value  The columns.
199      * @param context  The current context.
200      * @throws IOException When something is broken with the data.
201      */
202     @Override
203     public void map(ImmutableBytesWritable row, Result value,
204       Context context)
205     throws IOException {
206       try {
207         if (LOG.isTraceEnabled()) {
208           LOG.trace("Considering the row."
209               + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
210         }
211         if (filter == null
212             || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(),
213                 (short) row.getLength()))) {
214           for (Cell kv : value.rawCells()) {
215             kv = filterKv(filter, kv);
216             // skip if we filtered it out
217             if (kv == null) continue;
218             // TODO get rid of ensureKeyValue
219             KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
220             context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
221           }
222         }
223       } catch (InterruptedException e) {
224         e.printStackTrace();
225       }
226     }
227
228     @Override
229     public void setup(Context context) throws IOException {
230       cfRenameMap = createCfRenameMap(context.getConfiguration());
231       filter = instantiateFilter(context.getConfiguration());
232       int reduceNum = context.getNumReduceTasks();
233       Configuration conf = context.getConfiguration();
234       TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
235       try (Connection conn = ConnectionFactory.createConnection(conf);
236           RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
237         byte[][] startKeys = regionLocator.getStartKeys();
238         if (startKeys.length != reduceNum) {
239           throw new IOException("Region split after job initialization");
240         }
241         KeyValueWritableComparable[] startKeyWraps =
242             new KeyValueWritableComparable[startKeys.length - 1];
243         for (int i = 1; i < startKeys.length; ++i) {
244           startKeyWraps[i - 1] =
245               new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
246         }
247         KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps;
248       }
249     }
250   }
251
252   /**
253    * A mapper that just writes out KeyValues.
254    */
255   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
256       justification="Writables are going away and this has been this way forever")
257   public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
258     private Map<byte[], byte[]> cfRenameMap;
259     private Filter filter;
260     private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
261
262     /**
263      * @param row  The current table row key.
264      * @param value  The columns.
265      * @param context  The current context.
266      * @throws IOException When something is broken with the data.
267      */
268     @Override
269     public void map(ImmutableBytesWritable row, Result value,
270       Context context)
271     throws IOException {
272       try {
273         if (LOG.isTraceEnabled()) {
274           LOG.trace("Considering the row."
275               + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
276         }
277         if (filter == null
278             || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(),
279                 (short) row.getLength()))) {
280           for (Cell kv : value.rawCells()) {
281             kv = filterKv(filter, kv);
282             // skip if we filtered it out
283             if (kv == null) continue;
284             // TODO get rid of ensureKeyValue
285             context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
286           }
287         }
288       } catch (InterruptedException e) {
289         e.printStackTrace();
290       }
291     }
292
293     @Override
294     public void setup(Context context) {
295       cfRenameMap = createCfRenameMap(context.getConfiguration());
296       filter = instantiateFilter(context.getConfiguration());
297     }
298   }
299
300   /**
301    * Write table content out to files in hdfs.
302    */
303   public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
304     private Map<byte[], byte[]> cfRenameMap;
305     private List<UUID> clusterIds;
306     private Filter filter;
307     private Durability durability;
308
309     /**
310      * @param row  The current table row key.
311      * @param value  The columns.
312      * @param context  The current context.
313      * @throws IOException When something is broken with the data.
314      */
315     @Override
316     public void map(ImmutableBytesWritable row, Result value,
317       Context context)
318     throws IOException {
319       try {
320         writeResult(row, value, context);
321       } catch (InterruptedException e) {
322         e.printStackTrace();
323       }
324     }
325
326     private void writeResult(ImmutableBytesWritable key, Result result, Context context)
327     throws IOException, InterruptedException {
328       Put put = null;
329       Delete delete = null;
330       if (LOG.isTraceEnabled()) {
331         LOG.trace("Considering the row."
332             + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
333       }
334       if (filter == null
335           || !filter.filterRowKey(CellUtil.createFirstOnRow(key.get(), key.getOffset(),
336               (short) key.getLength()))) {
337         processKV(key, result, context, put, delete);
338       }
339     }
340
341     protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
342         Delete delete) throws IOException, InterruptedException {
343       for (Cell kv : result.rawCells()) {
344         kv = filterKv(filter, kv);
345         // skip if we filter it out
346         if (kv == null) continue;
347
348         kv = convertKv(kv, cfRenameMap);
349         // Deletes and Puts are gathered and written when finished
350         /*
351          * If there are sequence of mutations and tombstones in an Export, and after Import the same
352          * sequence should be restored as it is. If we combine all Delete tombstones into single
353          * request then there is chance of ignoring few DeleteFamily tombstones, because if we
354          * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
355          * only newest in hbase table and ignoring other. Check - HBASE-12065
356          */
357         if (CellUtil.isDeleteFamily(kv)) {
358           Delete deleteFamily = new Delete(key.get());
359           deleteFamily.addDeleteMarker(kv);
360           if (durability != null) {
361             deleteFamily.setDurability(durability);
362           }
363           deleteFamily.setClusterIds(clusterIds);
364           context.write(key, deleteFamily);
365         } else if (CellUtil.isDelete(kv)) {
366           if (delete == null) {
367             delete = new Delete(key.get());
368           }
369           delete.addDeleteMarker(kv);
370         } else {
371           if (put == null) {
372             put = new Put(key.get());
373           }
374           addPutToKv(put, kv);
375         }
376       }
377       if (put != null) {
378         if (durability != null) {
379           put.setDurability(durability);
380         }
381         put.setClusterIds(clusterIds);
382         context.write(key, put);
383       }
384       if (delete != null) {
385         if (durability != null) {
386           delete.setDurability(durability);
387         }
388         delete.setClusterIds(clusterIds);
389         context.write(key, delete);
390       }
391     }
392
393     protected void addPutToKv(Put put, Cell kv) throws IOException {
394       put.add(kv);
395     }
396
397     @Override
398     public void setup(Context context) {
399       LOG.info("Setting up " + getClass() + " mapper.");
400       Configuration conf = context.getConfiguration();
401       cfRenameMap = createCfRenameMap(conf);
402       filter = instantiateFilter(conf);
403       String durabilityStr = conf.get(WAL_DURABILITY);
404       if(durabilityStr != null){
405         durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT));
406         LOG.info("setting WAL durability to " + durability);
407       } else {
408         LOG.info("setting WAL durability to default.");
409       }
410       // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
411       ZooKeeperWatcher zkw = null;
412       Exception ex = null;
413       try {
414         zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
415         clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
416       } catch (ZooKeeperConnectionException e) {
417         ex = e;
418         LOG.error("Problem connecting to ZooKeper during task setup", e);
419       } catch (KeeperException e) {
420         ex = e;
421         LOG.error("Problem reading ZooKeeper data during task setup", e);
422       } catch (IOException e) {
423         ex = e;
424         LOG.error("Problem setting up task", e);
425       } finally {
426         if (zkw != null) zkw.close();
427       }
428       if (clusterIds == null) {
429         // exit early if setup fails
430         throw new RuntimeException(ex);
431       }
432     }
433   }
434
435   /**
436    * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
437    * optionally not include in the job output
438    * @param conf {@link Configuration} from which to load the filter
439    * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
440    * @throws IllegalArgumentException if the filter is misconfigured
441    */
442   public static Filter instantiateFilter(Configuration conf) {
443     // get the filter, if it was configured    
444     Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
445     if (filterClass == null) {
446       LOG.debug("No configured filter class, accepting all keyvalues.");
447       return null;
448     }
449     LOG.debug("Attempting to create filter:" + filterClass);
450     String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
451     ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
452     try {
453       Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
454       return (Filter) m.invoke(null, quotedArgs);
455     } catch (IllegalAccessException e) {
456       LOG.error("Couldn't instantiate filter!", e);
457       throw new RuntimeException(e);
458     } catch (SecurityException e) {
459       LOG.error("Couldn't instantiate filter!", e);
460       throw new RuntimeException(e);
461     } catch (NoSuchMethodException e) {
462       LOG.error("Couldn't instantiate filter!", e);
463       throw new RuntimeException(e);
464     } catch (IllegalArgumentException e) {
465       LOG.error("Couldn't instantiate filter!", e);
466       throw new RuntimeException(e);
467     } catch (InvocationTargetException e) {
468       LOG.error("Couldn't instantiate filter!", e);
469       throw new RuntimeException(e);
470     }
471   }
472
473   private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
474     ArrayList<byte[]> quotedArgs = new ArrayList<byte[]>();
475     for (String stringArg : stringArgs) {
476       // all the filters' instantiation methods expected quoted args since they are coming from
477       // the shell, so add them here, though it shouldn't really be needed :-/
478       quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
479     }
480     return quotedArgs;
481   }
482
483   /**
484    * Attempt to filter out the keyvalue
485    * @param kv {@link KeyValue} on which to apply the filter
486    * @return <tt>null</tt> if the key should not be written, otherwise returns the original
487    *         {@link KeyValue}
488    */
489   public static Cell filterKv(Filter filter, Cell kv) throws IOException {
490     // apply the filter and skip this kv if the filter doesn't apply
491     if (filter != null) {
492       Filter.ReturnCode code = filter.filterKeyValue(kv);
493       if (LOG.isTraceEnabled()) {
494         LOG.trace("Filter returned:" + code + " for the key value:" + kv);
495       }
496       // if its not an accept type, then skip this kv
497       if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
498           .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
499         return null;
500       }
501     }
502     return kv;
503   }
504
505   // helper: create a new KeyValue based on CF rename map
506   private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
507     if(cfRenameMap != null) {
508       // If there's a rename mapping for this CF, create a new KeyValue
509       byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
510       if(newCfName != null) {
511           kv = new KeyValue(kv.getRowArray(), // row buffer 
512                   kv.getRowOffset(),        // row offset
513                   kv.getRowLength(),        // row length
514                   newCfName,                // CF buffer
515                   0,                        // CF offset 
516                   newCfName.length,         // CF length 
517                   kv.getQualifierArray(),   // qualifier buffer
518                   kv.getQualifierOffset(),  // qualifier offset
519                   kv.getQualifierLength(),  // qualifier length
520                   kv.getTimestamp(),        // timestamp
521                   KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
522                   kv.getValueArray(),       // value buffer 
523                   kv.getValueOffset(),      // value offset
524                   kv.getValueLength());     // value length
525       }
526     }
527     return kv;
528   }
529
530   // helper: make a map from sourceCfName to destCfName by parsing a config key
531   private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
532     Map<byte[], byte[]> cfRenameMap = null;
533     String allMappingsPropVal = conf.get(CF_RENAME_PROP);
534     if(allMappingsPropVal != null) {
535       // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
536       String[] allMappings = allMappingsPropVal.split(",");
537       for (String mapping: allMappings) {
538         if(cfRenameMap == null) {
539             cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
540         }
541         String [] srcAndDest = mapping.split(":");
542         if(srcAndDest.length != 2) {
543             continue;
544         }
545         cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
546       }
547     }
548     return cfRenameMap;
549   }
550
551   /**
552    * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
553    * the mapper how to rename column families.
554    * 
555    * <p>Alternately, instead of calling this function, you could set the configuration key 
556    * {@link #CF_RENAME_PROP} yourself. The value should look like 
557    * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
558    * the mapper behavior.
559    * 
560    * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
561    *  set
562    * @param renameMap a mapping from source CF names to destination CF names
563    */
564   static public void configureCfRenaming(Configuration conf,
565           Map<String, String> renameMap) {
566     StringBuilder sb = new StringBuilder();
567     for(Map.Entry<String,String> entry: renameMap.entrySet()) {
568       String sourceCf = entry.getKey();
569       String destCf = entry.getValue();
570
571       if(sourceCf.contains(":") || sourceCf.contains(",") ||
572               destCf.contains(":") || destCf.contains(",")) {
573         throw new IllegalArgumentException("Illegal character in CF names: "
574               + sourceCf + ", " + destCf);
575       }
576
577       if(sb.length() != 0) {
578         sb.append(",");
579       }
580       sb.append(sourceCf + ":" + destCf);
581     }
582     conf.set(CF_RENAME_PROP, sb.toString());
583   }
584
585   /**
586    * Add a Filter to be instantiated on import
587    * @param conf Configuration to update (will be passed to the job)
588    * @param clazz {@link Filter} subclass to instantiate on the server.
589    * @param filterArgs List of arguments to pass to the filter on instantiation
590    */
591   public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
592       List<String> filterArgs) throws IOException {
593     conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
594     conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
595   }
596
597   /**
598    * Sets up the actual job.
599    * @param conf The current configuration.
600    * @param args The command line parameters.
601    * @return The newly created job.
602    * @throws IOException When setting up the job fails.
603    */
604   public static Job createSubmittableJob(Configuration conf, String[] args)
605   throws IOException {
606     TableName tableName = TableName.valueOf(args[0]);
607     conf.set(TABLE_NAME, tableName.getNameAsString());
608     Path inputDir = new Path(args[1]);
609     Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
610     job.setJarByClass(Importer.class);
611     FileInputFormat.setInputPaths(job, inputDir);
612     job.setInputFormatClass(SequenceFileInputFormat.class);
613     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
614
615     // make sure we get the filter in the jars
616     try {
617       Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
618       if (filter != null) {
619         TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
620       }
621     } catch (Exception e) {
622       throw new IOException(e);
623     }
624
625     if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
626       LOG.info("Use Large Result!!");
627       try (Connection conn = ConnectionFactory.createConnection(conf);
628           Table table = conn.getTable(tableName);
629           RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
630         HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
631         job.setMapperClass(KeyValueSortImporter.class);
632         job.setReducerClass(KeyValueReducer.class);
633         Path outputDir = new Path(hfileOutPath);
634         FileOutputFormat.setOutputPath(job, outputDir);
635         job.setMapOutputKeyClass(KeyValueWritableComparable.class);
636         job.setMapOutputValueClass(KeyValue.class);
637         job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
638             KeyValueWritableComparable.KeyValueWritableComparator.class,
639             RawComparator.class);
640         Path partitionsPath =
641             new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
642         FileSystem fs = FileSystem.get(job.getConfiguration());
643         fs.deleteOnExit(partitionsPath);
644         job.setPartitionerClass(KeyValueWritableComparablePartitioner.class);
645         job.setNumReduceTasks(regionLocator.getStartKeys().length);
646         TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
647             com.google.common.base.Preconditions.class);
648       }
649     } else if (hfileOutPath != null) {
650       LOG.info("writing to hfiles for bulk load.");
651       job.setMapperClass(KeyValueImporter.class);
652       try (Connection conn = ConnectionFactory.createConnection(conf);
653           Table table = conn.getTable(tableName);
654           RegionLocator regionLocator = conn.getRegionLocator(tableName)){
655         job.setReducerClass(KeyValueSortReducer.class);
656         Path outputDir = new Path(hfileOutPath);
657         FileOutputFormat.setOutputPath(job, outputDir);
658         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
659         job.setMapOutputValueClass(KeyValue.class);
660         HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
661         TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
662             com.google.common.base.Preconditions.class);
663       }
664     } else {
665       LOG.info("writing directly to table from Mapper.");
666       // No reducers.  Just write straight to table.  Call initTableReducerJob
667       // because it sets up the TableOutputFormat.
668       job.setMapperClass(Importer.class);
669       TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
670       job.setNumReduceTasks(0);
671     }
672     return job;
673   }
674
675   /*
676    * @param errorMsg Error message.  Can be null.
677    */
678   private static void usage(final String errorMsg) {
679     if (errorMsg != null && errorMsg.length() > 0) {
680       System.err.println("ERROR: " + errorMsg);
681     }
682     System.err.println("Usage: Import [options] <tablename> <inputdir>");
683     System.err.println("By default Import will load data directly into HBase. To instead generate");
684     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
685     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
686     System.err.println("If there is a large result that includes too much KeyValue "
687         + "whitch can occur OOME caused by the memery sort in reducer, pass the option:");
688     System.err.println("  -D" + HAS_LARGE_RESULT + "=true");
689     System.err
690         .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
691     System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
692     System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
693     System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
694         + CF_RENAME_PROP + " property. Futher, filters will only use the"
695         + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
696         + " whether the current row needs to be ignored completely for processing and "
697         + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
698         + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
699         + " the KeyValue.");
700     System.err.println("To import data exported from HBase 0.94, use");
701     System.err.println("  -Dhbase.import.version=0.94");
702     System.err.println("  -D " + JOB_NAME_CONF_KEY
703         + "=jobName - use the specified mapreduce job name for the import");
704     System.err.println("For performance consider the following options:\n"
705         + "  -Dmapreduce.map.speculative=false\n"
706         + "  -Dmapreduce.reduce.speculative=false\n"
707         + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
708             +" Allowed values are the supported durability values"
709             +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
710   }
711
712   /**
713    * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
714    * need to flush all the regions of the table as the data is held in memory and is also not
715    * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
716    * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
717    */
718   public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
719       InterruptedException {
720     String tableName = conf.get(TABLE_NAME);
721     Admin hAdmin = null;
722     Connection connection = null;
723     String durability = conf.get(WAL_DURABILITY);
724     // Need to flush if the data is written to hbase and skip wal is enabled.
725     if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
726         && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
727       LOG.info("Flushing all data that skipped the WAL.");
728       try {
729         connection = ConnectionFactory.createConnection(conf);
730         hAdmin = connection.getAdmin();
731         hAdmin.flush(TableName.valueOf(tableName));
732       } finally {
733         if (hAdmin != null) {
734           hAdmin.close();
735         }
736         if (connection != null) {
737           connection.close();
738         }
739       }
740     }
741   }
742
743   @Override
744   public int run(String[] args) throws Exception {
745     if (args.length < 2) {
746       usage("Wrong number of arguments: " + args.length);
747       return -1;
748     }
749     String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
750     if (inputVersionString != null) {
751       getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
752     }
753     Job job = createSubmittableJob(getConf(), args);
754     boolean isJobSuccessful = job.waitForCompletion(true);
755     if(isJobSuccessful){
756       // Flush all the regions of the table
757       flushRegionsIfNecessary(getConf());
758     }
759     long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
760     long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
761     if (outputRecords < inputRecords) {
762       System.err.println("Warning, not all records were imported (maybe filtered out).");
763       if (outputRecords == 0) {
764         System.err.println("If the data was exported from HBase 0.94 "+
765             "consider using -Dhbase.import.version=0.94.");
766       }
767     }
768
769     return (isJobSuccessful ? 0 : 1);
770   }
771
772   /**
773    * Main entry point.
774    * @param args The command line parameters.
775    * @throws Exception When running the job fails.
776    */
777   public static void main(String[] args) throws Exception {
778     int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
779     System.exit(errCode);
780   }
781
782 }