001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.ByteArrayInputStream;
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.lang.reflect.InvocationTargetException;
026import java.lang.reflect.Method;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import java.util.Locale;
031import java.util.Map;
032import java.util.TreeMap;
033import java.util.UUID;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.KeyValueUtil;
044import org.apache.hadoop.hbase.PrivateCellUtil;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Tag;
047import org.apache.hadoop.hbase.ZooKeeperConnectionException;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.Delete;
052import org.apache.hadoop.hbase.client.Durability;
053import org.apache.hadoop.hbase.client.Mutation;
054import org.apache.hadoop.hbase.client.Put;
055import org.apache.hadoop.hbase.client.RegionLocator;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.Table;
058import org.apache.hadoop.hbase.filter.Filter;
059import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
062import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
063import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
064import org.apache.hadoop.io.RawComparator;
065import org.apache.hadoop.io.WritableComparable;
066import org.apache.hadoop.io.WritableComparator;
067import org.apache.hadoop.mapreduce.Job;
068import org.apache.hadoop.mapreduce.Partitioner;
069import org.apache.hadoop.mapreduce.Reducer;
070import org.apache.hadoop.mapreduce.TaskCounter;
071import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
072import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
073import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
074import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
075import org.apache.hadoop.util.Tool;
076import org.apache.hadoop.util.ToolRunner;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.apache.zookeeper.KeeperException;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082/**
083 * Import data written by {@link Export}.
084 */
085@InterfaceAudience.Public
086public class Import extends Configured implements Tool {
087  private static final Logger LOG = LoggerFactory.getLogger(Import.class);
088  final static String NAME = "import";
089  public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
090  public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
091  public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
092  public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
093  public final static String TABLE_NAME = "import.table.name";
094  public final static String WAL_DURABILITY = "import.wal.durability";
095  public final static String HAS_LARGE_RESULT = "import.bulk.hasLargeResult";
096
097  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
098
099  public static class CellWritableComparablePartitioner
100    extends Partitioner<CellWritableComparable, Cell> {
101    private static CellWritableComparable[] START_KEYS = null;
102
103    @Override
104    public int getPartition(CellWritableComparable key, Cell value, int numPartitions) {
105      for (int i = 0; i < START_KEYS.length; ++i) {
106        if (key.compareTo(START_KEYS[i]) <= 0) {
107          return i;
108        }
109      }
110      return START_KEYS.length;
111    }
112
113  }
114
115  public static class CellWritableComparable implements WritableComparable<CellWritableComparable> {
116
117    private Cell kv = null;
118
119    static {
120      // register this comparator
121      WritableComparator.define(CellWritableComparable.class, new CellWritableComparator());
122    }
123
124    public CellWritableComparable() {
125    }
126
127    public CellWritableComparable(Cell kv) {
128      this.kv = kv;
129    }
130
131    @Override
132    public void write(DataOutput out) throws IOException {
133      out.writeInt(PrivateCellUtil.estimatedSerializedSizeOfKey(kv));
134      out.writeInt(0);
135      PrivateCellUtil.writeFlatKey(kv, out);
136    }
137
138    @Override
139    public void readFields(DataInput in) throws IOException {
140      kv = KeyValue.create(in);
141    }
142
143    @Override
144    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
145        justification = "This is wrong, yes, but we should be purging Writables, not fixing them")
146    public int compareTo(CellWritableComparable o) {
147      return CellComparator.getInstance().compare(this.kv, o.kv);
148    }
149
150    public static class CellWritableComparator extends WritableComparator {
151
152      @Override
153      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
154        try {
155          CellWritableComparable kv1 = new CellWritableComparable();
156          kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
157          CellWritableComparable kv2 = new CellWritableComparable();
158          kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
159          return compare(kv1, kv2);
160        } catch (IOException e) {
161          throw new RuntimeException(e);
162        }
163      }
164
165    }
166
167  }
168
169  public static class CellReducer
170    extends Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell> {
171    protected void reduce(CellWritableComparable row, Iterable<Cell> kvs,
172      Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell>.Context context)
173      throws java.io.IOException, InterruptedException {
174      int index = 0;
175      for (Cell kv : kvs) {
176        context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)),
177          new MapReduceExtendedCell(kv));
178        if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, "
179          + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
180      }
181    }
182  }
183
184  public static class CellSortImporter extends TableMapper<CellWritableComparable, Cell> {
185    private Map<byte[], byte[]> cfRenameMap;
186    private Filter filter;
187    private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class);
188
189    /**
190     * @param row     The current table row key.
191     * @param value   The columns.
192     * @param context The current context.
193     * @throws IOException When something is broken with the data.
194     */
195    @Override
196    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
197      try {
198        if (LOG.isTraceEnabled()) {
199          LOG.trace(
200            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
201        }
202        if (
203          filter == null || !filter.filterRowKey(
204            PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
205        ) {
206          for (Cell kv : value.rawCells()) {
207            kv = filterKv(filter, kv);
208            // skip if we filtered it out
209            if (kv == null) continue;
210            Cell ret = convertKv(kv, cfRenameMap);
211            context.write(new CellWritableComparable(ret), ret);
212          }
213        }
214      } catch (InterruptedException e) {
215        LOG.error("Interrupted while emitting Cell", e);
216        Thread.currentThread().interrupt();
217      }
218    }
219
220    @Override
221    public void setup(Context context) throws IOException {
222      cfRenameMap = createCfRenameMap(context.getConfiguration());
223      filter = instantiateFilter(context.getConfiguration());
224      int reduceNum = context.getNumReduceTasks();
225      Configuration conf = context.getConfiguration();
226      TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
227      try (Connection conn = ConnectionFactory.createConnection(conf);
228        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
229        byte[][] startKeys = regionLocator.getStartKeys();
230        if (startKeys.length != reduceNum) {
231          throw new IOException("Region split after job initialization");
232        }
233        CellWritableComparable[] startKeyWraps = new CellWritableComparable[startKeys.length - 1];
234        for (int i = 1; i < startKeys.length; ++i) {
235          startKeyWraps[i - 1] =
236            new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
237        }
238        CellWritableComparablePartitioner.START_KEYS = startKeyWraps;
239      }
240    }
241  }
242
243  /**
244   * A mapper that just writes out KeyValues.
245   */
246  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
247      justification = "Writables are going away and this has been this way forever")
248  public static class CellImporter extends TableMapper<ImmutableBytesWritable, Cell> {
249    private Map<byte[], byte[]> cfRenameMap;
250    private Filter filter;
251    private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class);
252
253    /**
254     * @param row     The current table row key.
255     * @param value   The columns.
256     * @param context The current context.
257     * @throws IOException When something is broken with the data.
258     */
259    @Override
260    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
261      try {
262        if (LOG.isTraceEnabled()) {
263          LOG.trace(
264            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
265        }
266        if (
267          filter == null || !filter.filterRowKey(
268            PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
269        ) {
270          for (Cell kv : value.rawCells()) {
271            kv = filterKv(filter, kv);
272            // skip if we filtered it out
273            if (kv == null) continue;
274            context.write(row, new MapReduceExtendedCell(convertKv(kv, cfRenameMap)));
275          }
276        }
277      } catch (InterruptedException e) {
278        LOG.error("Interrupted while emitting Cell", e);
279        Thread.currentThread().interrupt();
280      }
281    }
282
283    @Override
284    public void setup(Context context) {
285      cfRenameMap = createCfRenameMap(context.getConfiguration());
286      filter = instantiateFilter(context.getConfiguration());
287    }
288  }
289
290  /**
291   * Write table content out to files in hdfs.
292   */
293  public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
294    private Map<byte[], byte[]> cfRenameMap;
295    private List<UUID> clusterIds;
296    private Filter filter;
297    private Durability durability;
298
299    /**
300     * @param row     The current table row key.
301     * @param value   The columns.
302     * @param context The current context.
303     * @throws IOException When something is broken with the data.
304     */
305    @Override
306    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
307      try {
308        writeResult(row, value, context);
309      } catch (InterruptedException e) {
310        LOG.error("Interrupted while writing result", e);
311        Thread.currentThread().interrupt();
312      }
313    }
314
315    private void writeResult(ImmutableBytesWritable key, Result result, Context context)
316      throws IOException, InterruptedException {
317      Put put = null;
318      Delete delete = null;
319      if (LOG.isTraceEnabled()) {
320        LOG.trace(
321          "Considering the row." + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
322      }
323      if (
324        filter == null || !filter.filterRowKey(
325          PrivateCellUtil.createFirstOnRow(key.get(), key.getOffset(), (short) key.getLength()))
326      ) {
327        processKV(key, result, context, put, delete);
328      }
329    }
330
331    protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
332      Delete delete) throws IOException, InterruptedException {
333      for (Cell kv : result.rawCells()) {
334        kv = filterKv(filter, kv);
335        // skip if we filter it out
336        if (kv == null) continue;
337
338        kv = convertKv(kv, cfRenameMap);
339        // Deletes and Puts are gathered and written when finished
340        /*
341         * If there are sequence of mutations and tombstones in an Export, and after Import the same
342         * sequence should be restored as it is. If we combine all Delete tombstones into single
343         * request then there is chance of ignoring few DeleteFamily tombstones, because if we
344         * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
345         * only newest in hbase table and ignoring other. Check - HBASE-12065
346         */
347        if (PrivateCellUtil.isDeleteFamily(kv)) {
348          Delete deleteFamily = new Delete(key.get());
349          deleteFamily.add(kv);
350          if (durability != null) {
351            deleteFamily.setDurability(durability);
352          }
353          deleteFamily.setClusterIds(clusterIds);
354          context.write(key, deleteFamily);
355        } else if (CellUtil.isDelete(kv)) {
356          if (delete == null) {
357            delete = new Delete(key.get());
358          }
359          delete.add(kv);
360        } else {
361          if (put == null) {
362            put = new Put(key.get());
363          }
364          addPutToKv(put, kv);
365        }
366      }
367      if (put != null) {
368        if (durability != null) {
369          put.setDurability(durability);
370        }
371        put.setClusterIds(clusterIds);
372        context.write(key, put);
373      }
374      if (delete != null) {
375        if (durability != null) {
376          delete.setDurability(durability);
377        }
378        delete.setClusterIds(clusterIds);
379        context.write(key, delete);
380      }
381    }
382
383    protected void addPutToKv(Put put, Cell kv) throws IOException {
384      put.add(kv);
385    }
386
387    @Override
388    public void setup(Context context) {
389      LOG.info("Setting up " + getClass() + " mapper.");
390      Configuration conf = context.getConfiguration();
391      cfRenameMap = createCfRenameMap(conf);
392      filter = instantiateFilter(conf);
393      String durabilityStr = conf.get(WAL_DURABILITY);
394      if (durabilityStr != null) {
395        durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT));
396        LOG.info("setting WAL durability to " + durability);
397      } else {
398        LOG.info("setting WAL durability to default.");
399      }
400      // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
401      ZKWatcher zkw = null;
402      Exception ex = null;
403      try {
404        zkw = new ZKWatcher(conf, context.getTaskAttemptID().toString(), null);
405        clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
406      } catch (ZooKeeperConnectionException e) {
407        ex = e;
408        LOG.error("Problem connecting to ZooKeper during task setup", e);
409      } catch (KeeperException e) {
410        ex = e;
411        LOG.error("Problem reading ZooKeeper data during task setup", e);
412      } catch (IOException e) {
413        ex = e;
414        LOG.error("Problem setting up task", e);
415      } finally {
416        if (zkw != null) zkw.close();
417      }
418      if (clusterIds == null) {
419        // exit early if setup fails
420        throw new RuntimeException(ex);
421      }
422    }
423  }
424
425  /**
426   * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
427   * optionally not include in the job output
428   * @param conf {@link Configuration} from which to load the filter
429   * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
430   * @throws IllegalArgumentException if the filter is misconfigured
431   */
432  public static Filter instantiateFilter(Configuration conf) {
433    // get the filter, if it was configured
434    Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
435    if (filterClass == null) {
436      LOG.debug("No configured filter class, accepting all keyvalues.");
437      return null;
438    }
439    LOG.debug("Attempting to create filter:" + filterClass);
440    String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
441    ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
442    try {
443      Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
444      return (Filter) m.invoke(null, quotedArgs);
445    } catch (IllegalAccessException e) {
446      LOG.error("Couldn't instantiate filter!", e);
447      throw new RuntimeException(e);
448    } catch (SecurityException e) {
449      LOG.error("Couldn't instantiate filter!", e);
450      throw new RuntimeException(e);
451    } catch (NoSuchMethodException e) {
452      LOG.error("Couldn't instantiate filter!", e);
453      throw new RuntimeException(e);
454    } catch (IllegalArgumentException e) {
455      LOG.error("Couldn't instantiate filter!", e);
456      throw new RuntimeException(e);
457    } catch (InvocationTargetException e) {
458      LOG.error("Couldn't instantiate filter!", e);
459      throw new RuntimeException(e);
460    }
461  }
462
463  private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
464    ArrayList<byte[]> quotedArgs = new ArrayList<>();
465    for (String stringArg : stringArgs) {
466      // all the filters' instantiation methods expected quoted args since they are coming from
467      // the shell, so add them here, though it shouldn't really be needed :-/
468      quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
469    }
470    return quotedArgs;
471  }
472
473  /**
474   * Attempt to filter out the keyvalue
475   * @param c {@link Cell} on which to apply the filter
476   * @return <tt>null</tt> if the key should not be written, otherwise returns the original
477   *         {@link Cell}
478   */
479  public static Cell filterKv(Filter filter, Cell c) throws IOException {
480    // apply the filter and skip this kv if the filter doesn't apply
481    if (filter != null) {
482      Filter.ReturnCode code = filter.filterCell(c);
483      if (LOG.isTraceEnabled()) {
484        LOG.trace("Filter returned:" + code + " for the cell:" + c);
485      }
486      // if its not an accept type, then skip this kv
487      if (
488        !(code.equals(Filter.ReturnCode.INCLUDE)
489          || code.equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))
490      ) {
491        return null;
492      }
493    }
494    return c;
495  }
496
497  // helper: create a new KeyValue based on CF rename map
498  private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
499    if (cfRenameMap != null) {
500      // If there's a rename mapping for this CF, create a new KeyValue
501      byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
502      if (newCfName != null) {
503        List<Tag> tags = PrivateCellUtil.getTags(kv);
504        kv = new KeyValue(kv.getRowArray(), // row buffer
505          kv.getRowOffset(), // row offset
506          kv.getRowLength(), // row length
507          newCfName, // CF buffer
508          0, // CF offset
509          newCfName.length, // CF length
510          kv.getQualifierArray(), // qualifier buffer
511          kv.getQualifierOffset(), // qualifier offset
512          kv.getQualifierLength(), // qualifier length
513          kv.getTimestamp(), // timestamp
514          KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
515          kv.getValueArray(), // value buffer
516          kv.getValueOffset(), // value offset
517          kv.getValueLength(), // value length
518          tags.size() == 0 ? null : tags);
519      }
520    }
521    return kv;
522  }
523
524  // helper: make a map from sourceCfName to destCfName by parsing a config key
525  private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
526    Map<byte[], byte[]> cfRenameMap = null;
527    String allMappingsPropVal = conf.get(CF_RENAME_PROP);
528    if (allMappingsPropVal != null) {
529      // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
530      String[] allMappings = allMappingsPropVal.split(",");
531      for (String mapping : allMappings) {
532        if (cfRenameMap == null) {
533          cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
534        }
535        String[] srcAndDest = mapping.split(":");
536        if (srcAndDest.length != 2) {
537          continue;
538        }
539        cfRenameMap.put(Bytes.toBytes(srcAndDest[0]), Bytes.toBytes(srcAndDest[1]));
540      }
541    }
542    return cfRenameMap;
543  }
544
545  /**
546   * <p>
547   * Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells the mapper
548   * how to rename column families.
549   * <p>
550   * Alternately, instead of calling this function, you could set the configuration key
551   * {@link #CF_RENAME_PROP} yourself. The value should look like
552   *
553   * <pre>
554   * srcCf1:destCf1,srcCf2:destCf2,....
555   * </pre>
556   *
557   * . This would have the same effect on the mapper behavior.
558   * @param conf      the Configuration in which the {@link #CF_RENAME_PROP} key will be set
559   * @param renameMap a mapping from source CF names to destination CF names
560   */
561  static public void configureCfRenaming(Configuration conf, Map<String, String> renameMap) {
562    StringBuilder sb = new StringBuilder();
563    for (Map.Entry<String, String> entry : renameMap.entrySet()) {
564      String sourceCf = entry.getKey();
565      String destCf = entry.getValue();
566
567      if (
568        sourceCf.contains(":") || sourceCf.contains(",") || destCf.contains(":")
569          || destCf.contains(",")
570      ) {
571        throw new IllegalArgumentException(
572          "Illegal character in CF names: " + sourceCf + ", " + destCf);
573      }
574
575      if (sb.length() != 0) {
576        sb.append(",");
577      }
578      sb.append(sourceCf + ":" + destCf);
579    }
580    conf.set(CF_RENAME_PROP, sb.toString());
581  }
582
583  /**
584   * Add a Filter to be instantiated on import
585   * @param conf       Configuration to update (will be passed to the job)
586   * @param clazz      {@link Filter} subclass to instantiate on the server.
587   * @param filterArgs List of arguments to pass to the filter on instantiation
588   */
589  public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
590    List<String> filterArgs) throws IOException {
591    conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
592    conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
593  }
594
595  /**
596   * Sets up the actual job.
597   * @param conf The current configuration.
598   * @param args The command line parameters.
599   * @return The newly created job.
600   * @throws IOException When setting up the job fails.
601   */
602  public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
603    TableName tableName = TableName.valueOf(args[0]);
604    conf.set(TABLE_NAME, tableName.getNameAsString());
605    Path inputDir = new Path(args[1]);
606    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
607    job.setJarByClass(Importer.class);
608    FileInputFormat.setInputPaths(job, inputDir);
609    job.setInputFormatClass(SequenceFileInputFormat.class);
610    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
611
612    // make sure we get the filter in the jars
613    try {
614      Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
615      if (filter != null) {
616        TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
617      }
618    } catch (Exception e) {
619      throw new IOException(e);
620    }
621
622    if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
623      LOG.info("Use Large Result!!");
624      try (Connection conn = ConnectionFactory.createConnection(conf);
625        Table table = conn.getTable(tableName);
626        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
627        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
628        job.setMapperClass(CellSortImporter.class);
629        job.setReducerClass(CellReducer.class);
630        Path outputDir = new Path(hfileOutPath);
631        FileOutputFormat.setOutputPath(job, outputDir);
632        job.setMapOutputKeyClass(CellWritableComparable.class);
633        job.setMapOutputValueClass(MapReduceExtendedCell.class);
634        job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
635          CellWritableComparable.CellWritableComparator.class, RawComparator.class);
636        Path partitionsPath =
637          new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
638        FileSystem fs = FileSystem.get(job.getConfiguration());
639        fs.deleteOnExit(partitionsPath);
640        job.setPartitionerClass(CellWritableComparablePartitioner.class);
641        job.setNumReduceTasks(regionLocator.getStartKeys().length);
642        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
643          org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
644      }
645    } else if (hfileOutPath != null) {
646      LOG.info("writing to hfiles for bulk load.");
647      job.setMapperClass(CellImporter.class);
648      try (Connection conn = ConnectionFactory.createConnection(conf);
649        Table table = conn.getTable(tableName);
650        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
651        job.setReducerClass(CellSortReducer.class);
652        Path outputDir = new Path(hfileOutPath);
653        FileOutputFormat.setOutputPath(job, outputDir);
654        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
655        job.setMapOutputValueClass(MapReduceExtendedCell.class);
656        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
657        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
658          org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
659      }
660    } else {
661      LOG.info("writing directly to table from Mapper.");
662      // No reducers. Just write straight to table. Call initTableReducerJob
663      // because it sets up the TableOutputFormat.
664      job.setMapperClass(Importer.class);
665      TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
666      job.setNumReduceTasks(0);
667    }
668    return job;
669  }
670
671  /*
672   * @param errorMsg Error message. Can be null.
673   */
674  private static void usage(final String errorMsg) {
675    if (errorMsg != null && errorMsg.length() > 0) {
676      System.err.println("ERROR: " + errorMsg);
677    }
678    System.err.println("Usage: Import [options] <tablename> <inputdir>");
679    System.err.println("By default Import will load data directly into HBase. To instead generate");
680    System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
681    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
682    System.err.println("If there is a large result that includes too much Cell "
683      + "whitch can occur OOME caused by the memery sort in reducer, pass the option:");
684    System.err.println("  -D" + HAS_LARGE_RESULT + "=true");
685    System.err
686      .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
687    System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
688    System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
689    System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
690      + CF_RENAME_PROP + " property. Futher, filters will only use the"
691      + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
692      + " whether the current row needs to be ignored completely for processing and "
693      + " Filter#filterCell(Cell) method to determine if the Cell should be added;"
694      + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
695      + " the Cell.");
696    System.err.println("To import data exported from HBase 0.94, use");
697    System.err.println("  -Dhbase.import.version=0.94");
698    System.err.println("  -D " + JOB_NAME_CONF_KEY
699      + "=jobName - use the specified mapreduce job name for the import");
700    System.err.println("For performance consider the following options:\n"
701      + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false\n"
702      + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
703      + " Allowed values are the supported durability values"
704      + " like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
705  }
706
707  /**
708   * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
709   * need to flush all the regions of the table as the data is held in memory and is also not
710   * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
711   * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
712   */
713  public static void flushRegionsIfNecessary(Configuration conf)
714    throws IOException, InterruptedException {
715    String tableName = conf.get(TABLE_NAME);
716    Admin hAdmin = null;
717    Connection connection = null;
718    String durability = conf.get(WAL_DURABILITY);
719    // Need to flush if the data is written to hbase and skip wal is enabled.
720    if (
721      conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
722        && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)
723    ) {
724      LOG.info("Flushing all data that skipped the WAL.");
725      try {
726        connection = ConnectionFactory.createConnection(conf);
727        hAdmin = connection.getAdmin();
728        hAdmin.flush(TableName.valueOf(tableName));
729      } finally {
730        if (hAdmin != null) {
731          hAdmin.close();
732        }
733        if (connection != null) {
734          connection.close();
735        }
736      }
737    }
738  }
739
740  @Override
741  public int run(String[] args) throws Exception {
742    if (args.length < 2) {
743      usage("Wrong number of arguments: " + args.length);
744      return -1;
745    }
746    String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
747    if (inputVersionString != null) {
748      getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
749    }
750    Job job = createSubmittableJob(getConf(), args);
751    boolean isJobSuccessful = job.waitForCompletion(true);
752    if (isJobSuccessful) {
753      // Flush all the regions of the table
754      flushRegionsIfNecessary(getConf());
755    }
756    long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
757    long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
758    if (outputRecords < inputRecords) {
759      System.err.println("Warning, not all records were imported (maybe filtered out).");
760      if (outputRecords == 0) {
761        System.err.println("If the data was exported from HBase 0.94 "
762          + "consider using -Dhbase.import.version=0.94.");
763      }
764    }
765
766    return (isJobSuccessful ? 0 : 1);
767  }
768
769  /**
770   * Main entry point.
771   * @param args The command line parameters.
772   * @throws Exception When running the job fails.
773   */
774  public static void main(String[] args) throws Exception {
775    int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
776    System.exit(errCode);
777  }
778
779}