001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.ByteArrayInputStream;
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.lang.reflect.InvocationTargetException;
026import java.lang.reflect.Method;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import java.util.Locale;
031import java.util.Map;
032import java.util.TreeMap;
033import java.util.UUID;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.CellComparator;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.KeyValueUtil;
044import org.apache.hadoop.hbase.PrivateCellUtil;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.Tag;
047import org.apache.hadoop.hbase.ZooKeeperConnectionException;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.Delete;
052import org.apache.hadoop.hbase.client.Durability;
053import org.apache.hadoop.hbase.client.Mutation;
054import org.apache.hadoop.hbase.client.Put;
055import org.apache.hadoop.hbase.client.RegionLocator;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.Table;
058import org.apache.hadoop.hbase.filter.Filter;
059import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
062import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
063import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
064import org.apache.hadoop.io.RawComparator;
065import org.apache.hadoop.io.WritableComparable;
066import org.apache.hadoop.io.WritableComparator;
067import org.apache.hadoop.mapreduce.Job;
068import org.apache.hadoop.mapreduce.Partitioner;
069import org.apache.hadoop.mapreduce.Reducer;
070import org.apache.hadoop.mapreduce.TaskCounter;
071import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
072import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
073import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
074import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
075import org.apache.hadoop.util.Tool;
076import org.apache.hadoop.util.ToolRunner;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.apache.zookeeper.KeeperException;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082/**
083 * Import data written by {@link Export}.
084 */
085@InterfaceAudience.Public
086public class Import extends Configured implements Tool {
087  private static final Logger LOG = LoggerFactory.getLogger(Import.class);
088  final static String NAME = "import";
089  public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
090  public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
091  public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
092  public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
093  public final static String TABLE_NAME = "import.table.name";
094  public final static String WAL_DURABILITY = "import.wal.durability";
095  public final static String HAS_LARGE_RESULT = "import.bulk.hasLargeResult";
096
097  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
098
099  public static class CellWritableComparablePartitioner
100    extends Partitioner<CellWritableComparable, Cell> {
101    private static CellWritableComparable[] START_KEYS = null;
102
103    @Override
104    public int getPartition(CellWritableComparable key, Cell value, int numPartitions) {
105      for (int i = 0; i < START_KEYS.length; ++i) {
106        if (key.compareTo(START_KEYS[i]) <= 0) {
107          return i;
108        }
109      }
110      return START_KEYS.length;
111    }
112
113  }
114
115  /**
116   * @deprecated Use {@link CellWritableComparablePartitioner}. Will be removed from 3.0 onwards
117   */
118  @Deprecated
119  public static class KeyValueWritableComparablePartitioner
120    extends Partitioner<KeyValueWritableComparable, KeyValue> {
121    private static KeyValueWritableComparable[] START_KEYS = null;
122
123    @Override
124    public int getPartition(KeyValueWritableComparable key, KeyValue value, int numPartitions) {
125      for (int i = 0; i < START_KEYS.length; ++i) {
126        if (key.compareTo(START_KEYS[i]) <= 0) {
127          return i;
128        }
129      }
130      return START_KEYS.length;
131    }
132  }
133
134  public static class KeyValueWritableComparable
135    implements WritableComparable<KeyValueWritableComparable> {
136
137    private KeyValue kv = null;
138
139    static {
140      // register this comparator
141      WritableComparator.define(KeyValueWritableComparable.class, new KeyValueWritableComparator());
142    }
143
144    public KeyValueWritableComparable() {
145    }
146
147    public KeyValueWritableComparable(KeyValue kv) {
148      this.kv = kv;
149    }
150
151    @Override
152    public void write(DataOutput out) throws IOException {
153      KeyValue.write(kv, out);
154    }
155
156    @Override
157    public void readFields(DataInput in) throws IOException {
158      kv = KeyValue.create(in);
159    }
160
161    @Override
162    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
163        justification = "This is wrong, yes, but we should be purging Writables, not fixing them")
164    public int compareTo(KeyValueWritableComparable o) {
165      return CellComparator.getInstance().compare(this.kv, o.kv);
166    }
167
168    public static class KeyValueWritableComparator extends WritableComparator {
169
170      @Override
171      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
172        try {
173          KeyValueWritableComparable kv1 = new KeyValueWritableComparable();
174          kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
175          KeyValueWritableComparable kv2 = new KeyValueWritableComparable();
176          kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
177          return compare(kv1, kv2);
178        } catch (IOException e) {
179          throw new RuntimeException(e);
180        }
181      }
182
183    }
184
185  }
186
187  public static class CellWritableComparable implements WritableComparable<CellWritableComparable> {
188
189    private Cell kv = null;
190
191    static {
192      // register this comparator
193      WritableComparator.define(CellWritableComparable.class, new CellWritableComparator());
194    }
195
196    public CellWritableComparable() {
197    }
198
199    public CellWritableComparable(Cell kv) {
200      this.kv = kv;
201    }
202
203    @Override
204    public void write(DataOutput out) throws IOException {
205      out.writeInt(PrivateCellUtil.estimatedSerializedSizeOfKey(kv));
206      out.writeInt(0);
207      PrivateCellUtil.writeFlatKey(kv, out);
208    }
209
210    @Override
211    public void readFields(DataInput in) throws IOException {
212      kv = KeyValue.create(in);
213    }
214
215    @Override
216    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
217        justification = "This is wrong, yes, but we should be purging Writables, not fixing them")
218    public int compareTo(CellWritableComparable o) {
219      return CellComparator.getInstance().compare(this.kv, o.kv);
220    }
221
222    public static class CellWritableComparator extends WritableComparator {
223
224      @Override
225      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
226        try {
227          CellWritableComparable kv1 = new CellWritableComparable();
228          kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
229          CellWritableComparable kv2 = new CellWritableComparable();
230          kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
231          return compare(kv1, kv2);
232        } catch (IOException e) {
233          throw new RuntimeException(e);
234        }
235      }
236
237    }
238
239  }
240
241  /**
242   * @deprecated Use {@link CellReducer}. Will be removed from 3.0 onwards
243   */
244  @Deprecated
245  public static class KeyValueReducer
246    extends Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
247    protected void reduce(
248      KeyValueWritableComparable row, Iterable<KeyValue> kvs, Reducer<KeyValueWritableComparable,
249        KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
250      throws java.io.IOException, InterruptedException {
251      int index = 0;
252      for (KeyValue kv : kvs) {
253        context.write(new ImmutableBytesWritable(kv.getRowArray()), kv);
254        if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, "
255          + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
256      }
257    }
258  }
259
260  public static class CellReducer
261    extends Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell> {
262    protected void reduce(CellWritableComparable row, Iterable<Cell> kvs,
263      Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell>.Context context)
264      throws java.io.IOException, InterruptedException {
265      int index = 0;
266      for (Cell kv : kvs) {
267        context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)),
268          new MapReduceExtendedCell(kv));
269        if (++index % 100 == 0) context.setStatus("Wrote " + index + " KeyValues, "
270          + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
271      }
272    }
273  }
274
275  /**
276   * @deprecated Use {@link CellSortImporter}. Will be removed from 3.0 onwards
277   */
278  @Deprecated
279  public static class KeyValueSortImporter
280    extends TableMapper<KeyValueWritableComparable, KeyValue> {
281    private Map<byte[], byte[]> cfRenameMap;
282    private Filter filter;
283    private static final Logger LOG = LoggerFactory.getLogger(KeyValueSortImporter.class);
284
285    /**
286     * @param row     The current table row key.
287     * @param value   The columns.
288     * @param context The current context.
289     * @throws IOException When something is broken with the data.
290     */
291    @Override
292    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
293      try {
294        if (LOG.isTraceEnabled()) {
295          LOG.trace(
296            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
297        }
298        if (
299          filter == null || !filter.filterRowKey(
300            PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
301        ) {
302          for (Cell kv : value.rawCells()) {
303            kv = filterKv(filter, kv);
304            // skip if we filtered it out
305            if (kv == null) continue;
306            // TODO get rid of ensureKeyValue
307            KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
308            context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
309          }
310        }
311      } catch (InterruptedException e) {
312        e.printStackTrace();
313      }
314    }
315
316    @Override
317    public void setup(Context context) throws IOException {
318      cfRenameMap = createCfRenameMap(context.getConfiguration());
319      filter = instantiateFilter(context.getConfiguration());
320      int reduceNum = context.getNumReduceTasks();
321      Configuration conf = context.getConfiguration();
322      TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
323      try (Connection conn = ConnectionFactory.createConnection(conf);
324        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
325        byte[][] startKeys = regionLocator.getStartKeys();
326        if (startKeys.length != reduceNum) {
327          throw new IOException("Region split after job initialization");
328        }
329        KeyValueWritableComparable[] startKeyWraps =
330          new KeyValueWritableComparable[startKeys.length - 1];
331        for (int i = 1; i < startKeys.length; ++i) {
332          startKeyWraps[i - 1] =
333            new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
334        }
335        KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps;
336      }
337    }
338  }
339
340  /**
341   * A mapper that just writes out KeyValues.
342   * @deprecated Use {@link CellImporter}. Will be removed from 3.0 onwards
343   */
344  @Deprecated
345  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
346      justification = "Writables are going away and this has been this way forever")
347  public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
348    private Map<byte[], byte[]> cfRenameMap;
349    private Filter filter;
350    private static final Logger LOG = LoggerFactory.getLogger(KeyValueImporter.class);
351
352    /**
353     * @param row     The current table row key.
354     * @param value   The columns.
355     * @param context The current context.
356     * @throws IOException When something is broken with the data.
357     */
358    @Override
359    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
360      try {
361        if (LOG.isTraceEnabled()) {
362          LOG.trace(
363            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
364        }
365        if (
366          filter == null || !filter.filterRowKey(
367            PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
368        ) {
369          for (Cell kv : value.rawCells()) {
370            kv = filterKv(filter, kv);
371            // skip if we filtered it out
372            if (kv == null) continue;
373            // TODO get rid of ensureKeyValue
374            context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
375          }
376        }
377      } catch (InterruptedException e) {
378        e.printStackTrace();
379      }
380    }
381
382    @Override
383    public void setup(Context context) {
384      cfRenameMap = createCfRenameMap(context.getConfiguration());
385      filter = instantiateFilter(context.getConfiguration());
386    }
387  }
388
389  public static class CellSortImporter extends TableMapper<CellWritableComparable, Cell> {
390    private Map<byte[], byte[]> cfRenameMap;
391    private Filter filter;
392    private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class);
393
394    /**
395     * @param row     The current table row key.
396     * @param value   The columns.
397     * @param context The current context.
398     * @throws IOException When something is broken with the data.
399     */
400    @Override
401    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
402      try {
403        if (LOG.isTraceEnabled()) {
404          LOG.trace(
405            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
406        }
407        if (
408          filter == null || !filter.filterRowKey(
409            PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
410        ) {
411          for (Cell kv : value.rawCells()) {
412            kv = filterKv(filter, kv);
413            // skip if we filtered it out
414            if (kv == null) continue;
415            Cell ret = convertKv(kv, cfRenameMap);
416            context.write(new CellWritableComparable(ret), ret);
417          }
418        }
419      } catch (InterruptedException e) {
420        e.printStackTrace();
421      }
422    }
423
424    @Override
425    public void setup(Context context) throws IOException {
426      cfRenameMap = createCfRenameMap(context.getConfiguration());
427      filter = instantiateFilter(context.getConfiguration());
428      int reduceNum = context.getNumReduceTasks();
429      Configuration conf = context.getConfiguration();
430      TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
431      try (Connection conn = ConnectionFactory.createConnection(conf);
432        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
433        byte[][] startKeys = regionLocator.getStartKeys();
434        if (startKeys.length != reduceNum) {
435          throw new IOException("Region split after job initialization");
436        }
437        CellWritableComparable[] startKeyWraps = new CellWritableComparable[startKeys.length - 1];
438        for (int i = 1; i < startKeys.length; ++i) {
439          startKeyWraps[i - 1] =
440            new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
441        }
442        CellWritableComparablePartitioner.START_KEYS = startKeyWraps;
443      }
444    }
445  }
446
447  /**
448   * A mapper that just writes out KeyValues.
449   */
450  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
451      justification = "Writables are going away and this has been this way forever")
452  public static class CellImporter extends TableMapper<ImmutableBytesWritable, Cell> {
453    private Map<byte[], byte[]> cfRenameMap;
454    private Filter filter;
455    private static final Logger LOG = LoggerFactory.getLogger(CellImporter.class);
456
457    /**
458     * @param row     The current table row key.
459     * @param value   The columns.
460     * @param context The current context.
461     * @throws IOException When something is broken with the data.
462     */
463    @Override
464    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
465      try {
466        if (LOG.isTraceEnabled()) {
467          LOG.trace(
468            "Considering the row." + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
469        }
470        if (
471          filter == null || !filter.filterRowKey(
472            PrivateCellUtil.createFirstOnRow(row.get(), row.getOffset(), (short) row.getLength()))
473        ) {
474          for (Cell kv : value.rawCells()) {
475            kv = filterKv(filter, kv);
476            // skip if we filtered it out
477            if (kv == null) continue;
478            context.write(row, new MapReduceExtendedCell(convertKv(kv, cfRenameMap)));
479          }
480        }
481      } catch (InterruptedException e) {
482        e.printStackTrace();
483      }
484    }
485
486    @Override
487    public void setup(Context context) {
488      cfRenameMap = createCfRenameMap(context.getConfiguration());
489      filter = instantiateFilter(context.getConfiguration());
490    }
491  }
492
493  /**
494   * Write table content out to files in hdfs.
495   */
496  public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
497    private Map<byte[], byte[]> cfRenameMap;
498    private List<UUID> clusterIds;
499    private Filter filter;
500    private Durability durability;
501
502    /**
503     * @param row     The current table row key.
504     * @param value   The columns.
505     * @param context The current context.
506     * @throws IOException When something is broken with the data.
507     */
508    @Override
509    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
510      try {
511        writeResult(row, value, context);
512      } catch (InterruptedException e) {
513        e.printStackTrace();
514      }
515    }
516
517    private void writeResult(ImmutableBytesWritable key, Result result, Context context)
518      throws IOException, InterruptedException {
519      Put put = null;
520      Delete delete = null;
521      if (LOG.isTraceEnabled()) {
522        LOG.trace(
523          "Considering the row." + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
524      }
525      if (
526        filter == null || !filter.filterRowKey(
527          PrivateCellUtil.createFirstOnRow(key.get(), key.getOffset(), (short) key.getLength()))
528      ) {
529        processKV(key, result, context, put, delete);
530      }
531    }
532
533    protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
534      Delete delete) throws IOException, InterruptedException {
535      for (Cell kv : result.rawCells()) {
536        kv = filterKv(filter, kv);
537        // skip if we filter it out
538        if (kv == null) continue;
539
540        kv = convertKv(kv, cfRenameMap);
541        // Deletes and Puts are gathered and written when finished
542        /*
543         * If there are sequence of mutations and tombstones in an Export, and after Import the same
544         * sequence should be restored as it is. If we combine all Delete tombstones into single
545         * request then there is chance of ignoring few DeleteFamily tombstones, because if we
546         * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
547         * only newest in hbase table and ignoring other. Check - HBASE-12065
548         */
549        if (PrivateCellUtil.isDeleteFamily(kv)) {
550          Delete deleteFamily = new Delete(key.get());
551          deleteFamily.add(kv);
552          if (durability != null) {
553            deleteFamily.setDurability(durability);
554          }
555          deleteFamily.setClusterIds(clusterIds);
556          context.write(key, deleteFamily);
557        } else if (CellUtil.isDelete(kv)) {
558          if (delete == null) {
559            delete = new Delete(key.get());
560          }
561          delete.add(kv);
562        } else {
563          if (put == null) {
564            put = new Put(key.get());
565          }
566          addPutToKv(put, kv);
567        }
568      }
569      if (put != null) {
570        if (durability != null) {
571          put.setDurability(durability);
572        }
573        put.setClusterIds(clusterIds);
574        context.write(key, put);
575      }
576      if (delete != null) {
577        if (durability != null) {
578          delete.setDurability(durability);
579        }
580        delete.setClusterIds(clusterIds);
581        context.write(key, delete);
582      }
583    }
584
585    protected void addPutToKv(Put put, Cell kv) throws IOException {
586      put.add(kv);
587    }
588
589    @Override
590    public void setup(Context context) {
591      LOG.info("Setting up " + getClass() + " mapper.");
592      Configuration conf = context.getConfiguration();
593      cfRenameMap = createCfRenameMap(conf);
594      filter = instantiateFilter(conf);
595      String durabilityStr = conf.get(WAL_DURABILITY);
596      if (durabilityStr != null) {
597        durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT));
598        LOG.info("setting WAL durability to " + durability);
599      } else {
600        LOG.info("setting WAL durability to default.");
601      }
602      // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
603      ZKWatcher zkw = null;
604      Exception ex = null;
605      try {
606        zkw = new ZKWatcher(conf, context.getTaskAttemptID().toString(), null);
607        clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
608      } catch (ZooKeeperConnectionException e) {
609        ex = e;
610        LOG.error("Problem connecting to ZooKeper during task setup", e);
611      } catch (KeeperException e) {
612        ex = e;
613        LOG.error("Problem reading ZooKeeper data during task setup", e);
614      } catch (IOException e) {
615        ex = e;
616        LOG.error("Problem setting up task", e);
617      } finally {
618        if (zkw != null) zkw.close();
619      }
620      if (clusterIds == null) {
621        // exit early if setup fails
622        throw new RuntimeException(ex);
623      }
624    }
625  }
626
627  /**
628   * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
629   * optionally not include in the job output
630   * @param conf {@link Configuration} from which to load the filter
631   * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
632   * @throws IllegalArgumentException if the filter is misconfigured
633   */
634  public static Filter instantiateFilter(Configuration conf) {
635    // get the filter, if it was configured
636    Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
637    if (filterClass == null) {
638      LOG.debug("No configured filter class, accepting all keyvalues.");
639      return null;
640    }
641    LOG.debug("Attempting to create filter:" + filterClass);
642    String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
643    ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
644    try {
645      Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
646      return (Filter) m.invoke(null, quotedArgs);
647    } catch (IllegalAccessException e) {
648      LOG.error("Couldn't instantiate filter!", e);
649      throw new RuntimeException(e);
650    } catch (SecurityException e) {
651      LOG.error("Couldn't instantiate filter!", e);
652      throw new RuntimeException(e);
653    } catch (NoSuchMethodException e) {
654      LOG.error("Couldn't instantiate filter!", e);
655      throw new RuntimeException(e);
656    } catch (IllegalArgumentException e) {
657      LOG.error("Couldn't instantiate filter!", e);
658      throw new RuntimeException(e);
659    } catch (InvocationTargetException e) {
660      LOG.error("Couldn't instantiate filter!", e);
661      throw new RuntimeException(e);
662    }
663  }
664
665  private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
666    ArrayList<byte[]> quotedArgs = new ArrayList<>();
667    for (String stringArg : stringArgs) {
668      // all the filters' instantiation methods expected quoted args since they are coming from
669      // the shell, so add them here, though it shouldn't really be needed :-/
670      quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
671    }
672    return quotedArgs;
673  }
674
675  /**
676   * Attempt to filter out the keyvalue
677   * @param c {@link Cell} on which to apply the filter
678   * @return <tt>null</tt> if the key should not be written, otherwise returns the original
679   *         {@link Cell}
680   */
681  public static Cell filterKv(Filter filter, Cell c) throws IOException {
682    // apply the filter and skip this kv if the filter doesn't apply
683    if (filter != null) {
684      Filter.ReturnCode code = filter.filterCell(c);
685      if (LOG.isTraceEnabled()) {
686        LOG.trace("Filter returned:" + code + " for the cell:" + c);
687      }
688      // if its not an accept type, then skip this kv
689      if (
690        !(code.equals(Filter.ReturnCode.INCLUDE)
691          || code.equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))
692      ) {
693        return null;
694      }
695    }
696    return c;
697  }
698
699  // helper: create a new KeyValue based on CF rename map
700  private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
701    if (cfRenameMap != null) {
702      // If there's a rename mapping for this CF, create a new KeyValue
703      byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
704      if (newCfName != null) {
705        List<Tag> tags = PrivateCellUtil.getTags(kv);
706        kv = new KeyValue(kv.getRowArray(), // row buffer
707          kv.getRowOffset(), // row offset
708          kv.getRowLength(), // row length
709          newCfName, // CF buffer
710          0, // CF offset
711          newCfName.length, // CF length
712          kv.getQualifierArray(), // qualifier buffer
713          kv.getQualifierOffset(), // qualifier offset
714          kv.getQualifierLength(), // qualifier length
715          kv.getTimestamp(), // timestamp
716          KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
717          kv.getValueArray(), // value buffer
718          kv.getValueOffset(), // value offset
719          kv.getValueLength(), // value length
720          tags.size() == 0 ? null : tags);
721      }
722    }
723    return kv;
724  }
725
726  // helper: make a map from sourceCfName to destCfName by parsing a config key
727  private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
728    Map<byte[], byte[]> cfRenameMap = null;
729    String allMappingsPropVal = conf.get(CF_RENAME_PROP);
730    if (allMappingsPropVal != null) {
731      // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
732      String[] allMappings = allMappingsPropVal.split(",");
733      for (String mapping : allMappings) {
734        if (cfRenameMap == null) {
735          cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
736        }
737        String[] srcAndDest = mapping.split(":");
738        if (srcAndDest.length != 2) {
739          continue;
740        }
741        cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
742      }
743    }
744    return cfRenameMap;
745  }
746
747  /**
748   * <p>
749   * Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells the mapper
750   * how to rename column families.
751   * <p>
752   * Alternately, instead of calling this function, you could set the configuration key
753   * {@link #CF_RENAME_PROP} yourself. The value should look like
754   *
755   * <pre>
756   * srcCf1:destCf1,srcCf2:destCf2,....
757   * </pre>
758   *
759   * . This would have the same effect on the mapper behavior.
760   * @param conf      the Configuration in which the {@link #CF_RENAME_PROP} key will be set
761   * @param renameMap a mapping from source CF names to destination CF names
762   */
763  static public void configureCfRenaming(Configuration conf, Map<String, String> renameMap) {
764    StringBuilder sb = new StringBuilder();
765    for (Map.Entry<String, String> entry : renameMap.entrySet()) {
766      String sourceCf = entry.getKey();
767      String destCf = entry.getValue();
768
769      if (
770        sourceCf.contains(":") || sourceCf.contains(",") || destCf.contains(":")
771          || destCf.contains(",")
772      ) {
773        throw new IllegalArgumentException(
774          "Illegal character in CF names: " + sourceCf + ", " + destCf);
775      }
776
777      if (sb.length() != 0) {
778        sb.append(",");
779      }
780      sb.append(sourceCf + ":" + destCf);
781    }
782    conf.set(CF_RENAME_PROP, sb.toString());
783  }
784
785  /**
786   * Add a Filter to be instantiated on import
787   * @param conf       Configuration to update (will be passed to the job)
788   * @param clazz      {@link Filter} subclass to instantiate on the server.
789   * @param filterArgs List of arguments to pass to the filter on instantiation
790   */
791  public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
792    List<String> filterArgs) throws IOException {
793    conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
794    conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
795  }
796
797  /**
798   * Sets up the actual job.
799   * @param conf The current configuration.
800   * @param args The command line parameters.
801   * @return The newly created job.
802   * @throws IOException When setting up the job fails.
803   */
804  public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
805    TableName tableName = TableName.valueOf(args[0]);
806    conf.set(TABLE_NAME, tableName.getNameAsString());
807    Path inputDir = new Path(args[1]);
808    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
809    job.setJarByClass(Importer.class);
810    FileInputFormat.setInputPaths(job, inputDir);
811    job.setInputFormatClass(SequenceFileInputFormat.class);
812    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
813
814    // make sure we get the filter in the jars
815    try {
816      Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
817      if (filter != null) {
818        TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
819      }
820    } catch (Exception e) {
821      throw new IOException(e);
822    }
823
824    if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
825      LOG.info("Use Large Result!!");
826      try (Connection conn = ConnectionFactory.createConnection(conf);
827        Table table = conn.getTable(tableName);
828        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
829        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
830        job.setMapperClass(CellSortImporter.class);
831        job.setReducerClass(CellReducer.class);
832        Path outputDir = new Path(hfileOutPath);
833        FileOutputFormat.setOutputPath(job, outputDir);
834        job.setMapOutputKeyClass(CellWritableComparable.class);
835        job.setMapOutputValueClass(MapReduceExtendedCell.class);
836        job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
837          CellWritableComparable.CellWritableComparator.class, RawComparator.class);
838        Path partitionsPath =
839          new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
840        FileSystem fs = FileSystem.get(job.getConfiguration());
841        fs.deleteOnExit(partitionsPath);
842        job.setPartitionerClass(CellWritableComparablePartitioner.class);
843        job.setNumReduceTasks(regionLocator.getStartKeys().length);
844        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
845          org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
846      }
847    } else if (hfileOutPath != null) {
848      LOG.info("writing to hfiles for bulk load.");
849      job.setMapperClass(CellImporter.class);
850      try (Connection conn = ConnectionFactory.createConnection(conf);
851        Table table = conn.getTable(tableName);
852        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
853        job.setReducerClass(CellSortReducer.class);
854        Path outputDir = new Path(hfileOutPath);
855        FileOutputFormat.setOutputPath(job, outputDir);
856        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
857        job.setMapOutputValueClass(MapReduceExtendedCell.class);
858        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
859        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
860          org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
861      }
862    } else {
863      LOG.info("writing directly to table from Mapper.");
864      // No reducers. Just write straight to table. Call initTableReducerJob
865      // because it sets up the TableOutputFormat.
866      job.setMapperClass(Importer.class);
867      TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
868      job.setNumReduceTasks(0);
869    }
870    return job;
871  }
872
873  /*
874   * @param errorMsg Error message. Can be null.
875   */
876  private static void usage(final String errorMsg) {
877    if (errorMsg != null && errorMsg.length() > 0) {
878      System.err.println("ERROR: " + errorMsg);
879    }
880    System.err.println("Usage: Import [options] <tablename> <inputdir>");
881    System.err.println("By default Import will load data directly into HBase. To instead generate");
882    System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
883    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
884    System.err.println("If there is a large result that includes too much Cell "
885      + "whitch can occur OOME caused by the memery sort in reducer, pass the option:");
886    System.err.println("  -D" + HAS_LARGE_RESULT + "=true");
887    System.err
888      .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
889    System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
890    System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
891    System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
892      + CF_RENAME_PROP + " property. Futher, filters will only use the"
893      + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
894      + " whether the current row needs to be ignored completely for processing and "
895      + " Filter#filterCell(Cell) method to determine if the Cell should be added;"
896      + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
897      + " the Cell.");
898    System.err.println("To import data exported from HBase 0.94, use");
899    System.err.println("  -Dhbase.import.version=0.94");
900    System.err.println("  -D " + JOB_NAME_CONF_KEY
901      + "=jobName - use the specified mapreduce job name for the import");
902    System.err.println("For performance consider the following options:\n"
903      + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false\n"
904      + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
905      + " Allowed values are the supported durability values"
906      + " like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
907  }
908
909  /**
910   * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
911   * need to flush all the regions of the table as the data is held in memory and is also not
912   * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
913   * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
914   */
915  public static void flushRegionsIfNecessary(Configuration conf)
916    throws IOException, InterruptedException {
917    String tableName = conf.get(TABLE_NAME);
918    Admin hAdmin = null;
919    Connection connection = null;
920    String durability = conf.get(WAL_DURABILITY);
921    // Need to flush if the data is written to hbase and skip wal is enabled.
922    if (
923      conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
924        && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)
925    ) {
926      LOG.info("Flushing all data that skipped the WAL.");
927      try {
928        connection = ConnectionFactory.createConnection(conf);
929        hAdmin = connection.getAdmin();
930        hAdmin.flush(TableName.valueOf(tableName));
931      } finally {
932        if (hAdmin != null) {
933          hAdmin.close();
934        }
935        if (connection != null) {
936          connection.close();
937        }
938      }
939    }
940  }
941
942  @Override
943  public int run(String[] args) throws Exception {
944    if (args.length < 2) {
945      usage("Wrong number of arguments: " + args.length);
946      return -1;
947    }
948    String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
949    if (inputVersionString != null) {
950      getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
951    }
952    Job job = createSubmittableJob(getConf(), args);
953    boolean isJobSuccessful = job.waitForCompletion(true);
954    if (isJobSuccessful) {
955      // Flush all the regions of the table
956      flushRegionsIfNecessary(getConf());
957    }
958    long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
959    long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
960    if (outputRecords < inputRecords) {
961      System.err.println("Warning, not all records were imported (maybe filtered out).");
962      if (outputRecords == 0) {
963        System.err.println("If the data was exported from HBase 0.94 "
964          + "consider using -Dhbase.import.version=0.94.");
965      }
966    }
967
968    return (isJobSuccessful ? 0 : 1);
969  }
970
971  /**
972   * Main entry point.
973   * @param args The command line parameters.
974   * @throws Exception When running the job fails.
975   */
976  public static void main(String[] args) throws Exception {
977    int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
978    System.exit(errCode);
979  }
980
981}