001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.ByteArrayInputStream;
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.lang.reflect.InvocationTargetException;
026import java.lang.reflect.Method;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import java.util.Locale;
031import java.util.Map;
032import java.util.TreeMap;
033import java.util.UUID;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.ExtendedCell;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.KeyValueUtil;
045import org.apache.hadoop.hbase.PrivateCellUtil;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.Tag;
048import org.apache.hadoop.hbase.ZooKeeperConnectionException;
049import org.apache.hadoop.hbase.client.Admin;
050import org.apache.hadoop.hbase.client.ClientInternalHelper;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.ConnectionFactory;
053import org.apache.hadoop.hbase.client.Delete;
054import org.apache.hadoop.hbase.client.Durability;
055import org.apache.hadoop.hbase.client.Mutation;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.RegionLocator;
058import org.apache.hadoop.hbase.client.Result;
059import org.apache.hadoop.hbase.client.Table;
060import org.apache.hadoop.hbase.filter.Filter;
061import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
064import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
065import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
066import org.apache.hadoop.io.RawComparator;
067import org.apache.hadoop.io.WritableComparable;
068import org.apache.hadoop.io.WritableComparator;
069import org.apache.hadoop.mapreduce.Job;
070import org.apache.hadoop.mapreduce.Partitioner;
071import org.apache.hadoop.mapreduce.Reducer;
072import org.apache.hadoop.mapreduce.TaskCounter;
073import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
074import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
075import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
076import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
077import org.apache.hadoop.util.Tool;
078import org.apache.hadoop.util.ToolRunner;
079import org.apache.yetus.audience.InterfaceAudience;
080import org.apache.zookeeper.KeeperException;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084/**
085 * Import data written by {@link Export}.
086 */
087@InterfaceAudience.Public
088public class Import extends Configured implements Tool {
089  private static final Logger LOG = LoggerFactory.getLogger(Import.class);
090  final static String NAME = "import";
091  public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
092  public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
093  public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
094  public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
095  public final static String TABLE_NAME = "import.table.name";
096  public final static String WAL_DURABILITY = "import.wal.durability";
097  public final static String HAS_LARGE_RESULT = "import.bulk.hasLargeResult";
098
099  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
100
101  public static class CellWritableComparablePartitioner
102    extends Partitioner<CellWritableComparable, Cell> {
103    private static CellWritableComparable[] START_KEYS = null;
104
105    @Override
106    public int getPartition(CellWritableComparable key, Cell value, int numPartitions) {
107      for (int i = 0; i < START_KEYS.length; ++i) {
108        if (key.compareTo(START_KEYS[i]) <= 0) {
109          return i;
110        }
111      }
112      return START_KEYS.length;
113    }
114
115  }
116
117  public static class CellWritableComparable implements WritableComparable<CellWritableComparable> {
118
119    private ExtendedCell kv = null;
120
121    static {
122      // register this comparator
123      WritableComparator.define(CellWritableComparable.class, new CellWritableComparator());
124    }
125
126    public CellWritableComparable() {
127    }
128
129    public CellWritableComparable(Cell kv) {
130      this.kv = (ExtendedCell) kv;
131    }
132
133    @Override
134    public void write(DataOutput out) throws IOException {
135      int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv);
136      int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value.
137      out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
138      out.writeInt(keyLen);
139      out.writeInt(valueLen);
140      PrivateCellUtil.writeFlatKey(kv, out);
141    }
142
143    @Override
144    public void readFields(DataInput in) throws IOException {
145      kv = KeyValue.create(in);
146    }
147
148    @Override
149    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
150        justification = "This is wrong, yes, but we should be purging Writables, not fixing them")
151    public int compareTo(CellWritableComparable o) {
152      return CellComparator.getInstance().compare(this.kv, o.kv);
153    }
154
155    public static class CellWritableComparator extends WritableComparator {
156
157      @Override
158      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
159        try {
160          CellWritableComparable kv1 = new CellWritableComparable();
161          kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
162          CellWritableComparable kv2 = new CellWritableComparable();
163          kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
164          return compare(kv1, kv2);
165        } catch (IOException e) {
166          throw new RuntimeException(e);
167        }
168      }
169
170    }
171
172  }
173
174  public static class CellReducer
175    extends Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell> {
176    protected void reduce(CellWritableComparable row, Iterable<Cell> kvs,
177      Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell>.Context context)
178      throws java.io.IOException, InterruptedException {
179      int index = 0;
180      for (Cell kv : kvs) {
181        context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)),
182          new MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(kv)));
183        if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, "
184          + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
185      }
186    }
187  }
188
189  public static class CellSortImporter extends TableMapper<CellWritableComparable, Cell> {
190    private Map<byte[], byte[]> cfRenameMap;
191    private Filter filter;
192    private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class);
193
194    /**
195     * @param row     The current table row key.
196     * @param value   The columns.
197     * @param context The current context.
198     * @throws IOException When something is broken with the data.
199     */
200    @Override
201    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
202      try {
203        if (LOG.isTraceEnabled()) {
204          LOG.trace(
205            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
206        }
207        if (
208          filter == null || !filter.filterRowKey(
209            PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
210        ) {
211          for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(value)) {
212            kv = filterKv(filter, kv);
213            // skip if we filtered it out
214            if (kv == null) {
215              continue;
216            }
217            Cell ret = convertKv(kv, cfRenameMap);
218            context.write(new CellWritableComparable(ret),
219              new MapReduceExtendedCell((ExtendedCell) ret));
220          }
221        }
222      } catch (InterruptedException e) {
223        LOG.error("Interrupted while emitting Cell", e);
224        Thread.currentThread().interrupt();
225      }
226    }
227
228    @Override
229    public void setup(Context context) throws IOException {
230      cfRenameMap = createCfRenameMap(context.getConfiguration());
231      filter = instantiateFilter(context.getConfiguration());
232      int reduceNum = context.getNumReduceTasks();
233      Configuration conf = context.getConfiguration();
234      TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
235      try (Connection conn = ConnectionFactory.createConnection(conf);
236        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
237        byte[][] startKeys = regionLocator.getStartKeys();
238        if (startKeys.length != reduceNum) {
239          throw new IOException("Region split after job initialization");
240        }
241        CellWritableComparable[] startKeyWraps = new CellWritableComparable[startKeys.length - 1];
242        for (int i = 1; i < startKeys.length; ++i) {
243          startKeyWraps[i - 1] =
244            new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
245        }
246        CellWritableComparablePartitioner.START_KEYS = startKeyWraps;
247      }
248    }
249  }
250
251  /**
252   * A mapper that just writes out KeyValues.
253   */
254  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
255      justification = "Writables are going away and this has been this way forever")
256  public static class CellImporter extends TableMapper<ImmutableBytesWritable, Cell> {
257    private Map<byte[], byte[]> cfRenameMap;
258    private Filter filter;
259    private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class);
260
261    /**
262     * @param row     The current table row key.
263     * @param value   The columns.
264     * @param context The current context.
265     * @throws IOException When something is broken with the data.
266     */
267    @Override
268    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
269      try {
270        if (LOG.isTraceEnabled()) {
271          LOG.trace(
272            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
273        }
274        if (
275          filter == null || !filter.filterRowKey(
276            PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
277        ) {
278          for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(value)) {
279            kv = filterKv(filter, kv);
280            // skip if we filtered it out
281            if (kv == null) {
282              continue;
283            }
284            context.write(row, new MapReduceExtendedCell(convertKv(kv, cfRenameMap)));
285          }
286        }
287      } catch (InterruptedException e) {
288        LOG.error("Interrupted while emitting Cell", e);
289        Thread.currentThread().interrupt();
290      }
291    }
292
293    @Override
294    public void setup(Context context) {
295      cfRenameMap = createCfRenameMap(context.getConfiguration());
296      filter = instantiateFilter(context.getConfiguration());
297    }
298  }
299
300  /**
301   * Write table content out to files in hdfs.
302   */
303  public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
304    private Map<byte[], byte[]> cfRenameMap;
305    private List<UUID> clusterIds;
306    private Filter filter;
307    private Durability durability;
308
309    /**
310     * @param row     The current table row key.
311     * @param value   The columns.
312     * @param context The current context.
313     * @throws IOException When something is broken with the data.
314     */
315    @Override
316    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
317      try {
318        writeResult(row, value, context);
319      } catch (InterruptedException e) {
320        LOG.error("Interrupted while writing result", e);
321        Thread.currentThread().interrupt();
322      }
323    }
324
325    private void writeResult(ImmutableBytesWritable key, Result result, Context context)
326      throws IOException, InterruptedException {
327      Put put = null;
328      Delete delete = null;
329      if (LOG.isTraceEnabled()) {
330        LOG.trace(
331          "Considering the row." + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
332      }
333      if (
334        filter == null || !filter.filterRowKey(
335          PrivateCellUtil.createFirstOnRow(key.get(), key.getOffset(), (short) key.getLength()))
336      ) {
337        processKV(key, result, context, put, delete);
338      }
339    }
340
341    protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
342      Delete delete) throws IOException, InterruptedException {
343      for (ExtendedCell kv : ClientInternalHelper.getExtendedRawCells(result)) {
344        kv = filterKv(filter, kv);
345        // skip if we filter it out
346        if (kv == null) {
347          continue;
348        }
349
350        kv = convertKv(kv, cfRenameMap);
351        // Deletes and Puts are gathered and written when finished
352        /*
353         * If there are sequence of mutations and tombstones in an Export, and after Import the same
354         * sequence should be restored as it is. If we combine all Delete tombstones into single
355         * request then there is chance of ignoring few DeleteFamily tombstones, because if we
356         * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
357         * only newest in hbase table and ignoring other. Check - HBASE-12065
358         */
359        if (PrivateCellUtil.isDeleteFamily(kv)) {
360          Delete deleteFamily = new Delete(key.get());
361          deleteFamily.add(kv);
362          if (durability != null) {
363            deleteFamily.setDurability(durability);
364          }
365          deleteFamily.setClusterIds(clusterIds);
366          context.write(key, deleteFamily);
367        } else if (CellUtil.isDelete(kv)) {
368          if (delete == null) {
369            delete = new Delete(key.get());
370          }
371          delete.add(kv);
372        } else {
373          if (put == null) {
374            put = new Put(key.get());
375          }
376          addPutToKv(put, kv);
377        }
378      }
379      if (put != null) {
380        if (durability != null) {
381          put.setDurability(durability);
382        }
383        put.setClusterIds(clusterIds);
384        context.write(key, put);
385      }
386      if (delete != null) {
387        if (durability != null) {
388          delete.setDurability(durability);
389        }
390        delete.setClusterIds(clusterIds);
391        context.write(key, delete);
392      }
393    }
394
395    protected void addPutToKv(Put put, Cell kv) throws IOException {
396      put.add(kv);
397    }
398
399    @Override
400    public void setup(Context context) {
401      LOG.info("Setting up " + getClass() + " mapper.");
402      Configuration conf = context.getConfiguration();
403      cfRenameMap = createCfRenameMap(conf);
404      filter = instantiateFilter(conf);
405      String durabilityStr = conf.get(WAL_DURABILITY);
406      if (durabilityStr != null) {
407        durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT));
408        LOG.info("setting WAL durability to " + durability);
409      } else {
410        LOG.info("setting WAL durability to default.");
411      }
412      // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
413      ZKWatcher zkw = null;
414      Exception ex = null;
415      try {
416        zkw = new ZKWatcher(conf, context.getTaskAttemptID().toString(), null);
417        clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
418      } catch (ZooKeeperConnectionException e) {
419        ex = e;
420        LOG.error("Problem connecting to ZooKeper during task setup", e);
421      } catch (KeeperException e) {
422        ex = e;
423        LOG.error("Problem reading ZooKeeper data during task setup", e);
424      } catch (IOException e) {
425        ex = e;
426        LOG.error("Problem setting up task", e);
427      } finally {
428        if (zkw != null) zkw.close();
429      }
430      if (clusterIds == null) {
431        // exit early if setup fails
432        throw new RuntimeException(ex);
433      }
434    }
435  }
436
437  /**
438   * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
439   * optionally not include in the job output
440   * @param conf {@link Configuration} from which to load the filter
441   * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
442   * @throws IllegalArgumentException if the filter is misconfigured
443   */
444  public static Filter instantiateFilter(Configuration conf) {
445    // get the filter, if it was configured
446    Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
447    if (filterClass == null) {
448      LOG.debug("No configured filter class, accepting all keyvalues.");
449      return null;
450    }
451    LOG.debug("Attempting to create filter:" + filterClass);
452    String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
453    ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
454    try {
455      Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
456      return (Filter) m.invoke(null, quotedArgs);
457    } catch (IllegalAccessException e) {
458      LOG.error("Couldn't instantiate filter!", e);
459      throw new RuntimeException(e);
460    } catch (SecurityException e) {
461      LOG.error("Couldn't instantiate filter!", e);
462      throw new RuntimeException(e);
463    } catch (NoSuchMethodException e) {
464      LOG.error("Couldn't instantiate filter!", e);
465      throw new RuntimeException(e);
466    } catch (IllegalArgumentException e) {
467      LOG.error("Couldn't instantiate filter!", e);
468      throw new RuntimeException(e);
469    } catch (InvocationTargetException e) {
470      LOG.error("Couldn't instantiate filter!", e);
471      throw new RuntimeException(e);
472    }
473  }
474
475  private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
476    ArrayList<byte[]> quotedArgs = new ArrayList<>();
477    for (String stringArg : stringArgs) {
478      // all the filters' instantiation methods expected quoted args since they are coming from
479      // the shell, so add them here, though it shouldn't really be needed :-/
480      quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
481    }
482    return quotedArgs;
483  }
484
485  /**
486   * Attempt to filter out the keyvalue
487   * @param c {@link Cell} on which to apply the filter
488   * @return <tt>null</tt> if the key should not be written, otherwise returns the original
489   *         {@link Cell}
490   */
491  public static ExtendedCell filterKv(Filter filter, ExtendedCell c) throws IOException {
492    // apply the filter and skip this kv if the filter doesn't apply
493    if (filter != null) {
494      Filter.ReturnCode code = filter.filterCell(c);
495      if (LOG.isTraceEnabled()) {
496        LOG.trace("Filter returned:" + code + " for the cell:" + c);
497      }
498      // if its not an accept type, then skip this kv
499      if (
500        !(code.equals(Filter.ReturnCode.INCLUDE)
501          || code.equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))
502      ) {
503        return null;
504      }
505    }
506    return c;
507  }
508
509  // helper: create a new KeyValue based on CF rename map
510  private static ExtendedCell convertKv(ExtendedCell kv, Map<byte[], byte[]> cfRenameMap) {
511    if (cfRenameMap != null) {
512      // If there's a rename mapping for this CF, create a new KeyValue
513      byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
514      if (newCfName != null) {
515        List<Tag> tags = PrivateCellUtil.getTags(kv);
516        kv = new KeyValue(kv.getRowArray(), // row buffer
517          kv.getRowOffset(), // row offset
518          kv.getRowLength(), // row length
519          newCfName, // CF buffer
520          0, // CF offset
521          newCfName.length, // CF length
522          kv.getQualifierArray(), // qualifier buffer
523          kv.getQualifierOffset(), // qualifier offset
524          kv.getQualifierLength(), // qualifier length
525          kv.getTimestamp(), // timestamp
526          KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
527          kv.getValueArray(), // value buffer
528          kv.getValueOffset(), // value offset
529          kv.getValueLength(), // value length
530          tags.size() == 0 ? null : tags);
531      }
532    }
533    return kv;
534  }
535
536  // helper: make a map from sourceCfName to destCfName by parsing a config key
537  private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
538    Map<byte[], byte[]> cfRenameMap = null;
539    String allMappingsPropVal = conf.get(CF_RENAME_PROP);
540    if (allMappingsPropVal != null) {
541      // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
542      String[] allMappings = allMappingsPropVal.split(",");
543      for (String mapping : allMappings) {
544        if (cfRenameMap == null) {
545          cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
546        }
547        String[] srcAndDest = mapping.split(":");
548        if (srcAndDest.length != 2) {
549          continue;
550        }
551        cfRenameMap.put(Bytes.toBytes(srcAndDest[0]), Bytes.toBytes(srcAndDest[1]));
552      }
553    }
554    return cfRenameMap;
555  }
556
557  /**
558   * <p>
559   * Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells the mapper
560   * how to rename column families.
561   * <p>
562   * Alternately, instead of calling this function, you could set the configuration key
563   * {@link #CF_RENAME_PROP} yourself. The value should look like
564   *
565   * <pre>
566   * srcCf1:destCf1,srcCf2:destCf2,....
567   * </pre>
568   *
569   * . This would have the same effect on the mapper behavior.
570   * @param conf      the Configuration in which the {@link #CF_RENAME_PROP} key will be set
571   * @param renameMap a mapping from source CF names to destination CF names
572   */
573  static public void configureCfRenaming(Configuration conf, Map<String, String> renameMap) {
574    StringBuilder sb = new StringBuilder();
575    for (Map.Entry<String, String> entry : renameMap.entrySet()) {
576      String sourceCf = entry.getKey();
577      String destCf = entry.getValue();
578
579      if (
580        sourceCf.contains(":") || sourceCf.contains(",") || destCf.contains(":")
581          || destCf.contains(",")
582      ) {
583        throw new IllegalArgumentException(
584          "Illegal character in CF names: " + sourceCf + ", " + destCf);
585      }
586
587      if (sb.length() != 0) {
588        sb.append(",");
589      }
590      sb.append(sourceCf + ":" + destCf);
591    }
592    conf.set(CF_RENAME_PROP, sb.toString());
593  }
594
595  /**
596   * Add a Filter to be instantiated on import
597   * @param conf       Configuration to update (will be passed to the job)
598   * @param clazz      {@link Filter} subclass to instantiate on the server.
599   * @param filterArgs List of arguments to pass to the filter on instantiation
600   */
601  public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
602    List<String> filterArgs) throws IOException {
603    conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
604    conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
605  }
606
607  /**
608   * Sets up the actual job.
609   * @param conf The current configuration.
610   * @param args The command line parameters.
611   * @return The newly created job.
612   * @throws IOException When setting up the job fails.
613   */
614  public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
615    TableName tableName = TableName.valueOf(args[0]);
616    conf.set(TABLE_NAME, tableName.getNameAsString());
617    Path inputDir = new Path(args[1]);
618    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
619    job.setJarByClass(Importer.class);
620    FileInputFormat.setInputPaths(job, inputDir);
621    job.setInputFormatClass(SequenceFileInputFormat.class);
622    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
623
624    // make sure we get the filter in the jars
625    try {
626      Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
627      if (filter != null) {
628        TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
629      }
630    } catch (Exception e) {
631      throw new IOException(e);
632    }
633
634    if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
635      LOG.info("Use Large Result!!");
636      try (Connection conn = ConnectionFactory.createConnection(conf);
637        Table table = conn.getTable(tableName);
638        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
639        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
640        job.setMapperClass(CellSortImporter.class);
641        job.setReducerClass(CellReducer.class);
642        Path outputDir = new Path(hfileOutPath);
643        FileOutputFormat.setOutputPath(job, outputDir);
644        job.setMapOutputKeyClass(CellWritableComparable.class);
645        job.setMapOutputValueClass(MapReduceExtendedCell.class);
646        job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
647          CellWritableComparable.CellWritableComparator.class, RawComparator.class);
648        Path partitionsPath =
649          new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
650        FileSystem fs = FileSystem.get(job.getConfiguration());
651        fs.deleteOnExit(partitionsPath);
652        job.setPartitionerClass(CellWritableComparablePartitioner.class);
653        job.setNumReduceTasks(regionLocator.getStartKeys().length);
654        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
655          org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
656      }
657    } else if (hfileOutPath != null) {
658      LOG.info("writing to hfiles for bulk load.");
659      job.setMapperClass(CellImporter.class);
660      try (Connection conn = ConnectionFactory.createConnection(conf);
661        Table table = conn.getTable(tableName);
662        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
663        job.setReducerClass(CellSortReducer.class);
664        Path outputDir = new Path(hfileOutPath);
665        FileOutputFormat.setOutputPath(job, outputDir);
666        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
667        job.setMapOutputValueClass(MapReduceExtendedCell.class);
668        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
669        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
670          org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
671      }
672    } else {
673      LOG.info("writing directly to table from Mapper.");
674      // No reducers. Just write straight to table. Call initTableReducerJob
675      // because it sets up the TableOutputFormat.
676      job.setMapperClass(Importer.class);
677      TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
678      job.setNumReduceTasks(0);
679    }
680    return job;
681  }
682
683  /*
684   * @param errorMsg Error message. Can be null.
685   */
686  private static void usage(final String errorMsg) {
687    if (errorMsg != null && errorMsg.length() > 0) {
688      System.err.println("ERROR: " + errorMsg);
689    }
690    System.err.println("Usage: Import [options] <tablename> <inputdir>");
691    System.err.println("By default Import will load data directly into HBase. To instead generate");
692    System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
693    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
694    System.err.println("If there is a large result that includes too much Cell "
695      + "whitch can occur OOME caused by the memery sort in reducer, pass the option:");
696    System.err.println("  -D" + HAS_LARGE_RESULT + "=true");
697    System.err
698      .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
699    System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
700    System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
701    System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
702      + CF_RENAME_PROP + " property. Futher, filters will only use the"
703      + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
704      + " whether the current row needs to be ignored completely for processing and "
705      + " Filter#filterCell(Cell) method to determine if the Cell should be added;"
706      + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
707      + " the Cell.");
708    System.err.println("To import data exported from HBase 0.94, use");
709    System.err.println("  -Dhbase.import.version=0.94");
710    System.err.println("  -D " + JOB_NAME_CONF_KEY
711      + "=jobName - use the specified mapreduce job name for the import");
712    System.err.println("For performance consider the following options:\n"
713      + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false\n"
714      + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
715      + " Allowed values are the supported durability values"
716      + " like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
717  }
718
719  /**
720   * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
721   * need to flush all the regions of the table as the data is held in memory and is also not
722   * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
723   * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
724   */
725  public static void flushRegionsIfNecessary(Configuration conf)
726    throws IOException, InterruptedException {
727    String tableName = conf.get(TABLE_NAME);
728    Admin hAdmin = null;
729    Connection connection = null;
730    String durability = conf.get(WAL_DURABILITY);
731    // Need to flush if the data is written to hbase and skip wal is enabled.
732    if (
733      conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
734        && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)
735    ) {
736      LOG.info("Flushing all data that skipped the WAL.");
737      try {
738        connection = ConnectionFactory.createConnection(conf);
739        hAdmin = connection.getAdmin();
740        hAdmin.flush(TableName.valueOf(tableName));
741      } finally {
742        if (hAdmin != null) {
743          hAdmin.close();
744        }
745        if (connection != null) {
746          connection.close();
747        }
748      }
749    }
750  }
751
752  @Override
753  public int run(String[] args) throws Exception {
754    if (args.length < 2) {
755      usage("Wrong number of arguments: " + args.length);
756      return -1;
757    }
758    String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
759    if (inputVersionString != null) {
760      getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
761    }
762    Job job = createSubmittableJob(getConf(), args);
763    boolean isJobSuccessful = job.waitForCompletion(true);
764    if (isJobSuccessful) {
765      // Flush all the regions of the table
766      flushRegionsIfNecessary(getConf());
767    }
768    long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
769    long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
770    if (outputRecords < inputRecords) {
771      System.err.println("Warning, not all records were imported (maybe filtered out).");
772      if (outputRecords == 0) {
773        System.err.println("If the data was exported from HBase 0.94 "
774          + "consider using -Dhbase.import.version=0.94.");
775      }
776    }
777
778    return (isJobSuccessful ? 0 : 1);
779  }
780
781  /**
782   * Main entry point.
783   * @param args The command line parameters.
784   * @throws Exception When running the job fails.
785   */
786  public static void main(String[] args) throws Exception {
787    int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
788    System.exit(errCode);
789  }
790
791}