001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import static org.junit.Assert.assertEquals;
022
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.EnumSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.Random;
032import java.util.Set;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.commons.lang3.RandomStringUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.ClusterMetrics.Option;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.IntegrationTestBase;
043import org.apache.hadoop.hbase.IntegrationTestingUtility;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.Consistency;
050import org.apache.hadoop.hbase.client.RegionLocator;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
055import org.apache.hadoop.hbase.coprocessor.ObserverContext;
056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
057import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
058import org.apache.hadoop.hbase.coprocessor.RegionObserver;
059import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
060import org.apache.hadoop.hbase.regionserver.InternalScanner;
061import org.apache.hadoop.hbase.testclassification.IntegrationTests;
062import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.hbase.util.RegionSplitter;
066import org.apache.hadoop.io.LongWritable;
067import org.apache.hadoop.io.NullWritable;
068import org.apache.hadoop.io.Writable;
069import org.apache.hadoop.io.WritableComparable;
070import org.apache.hadoop.io.WritableComparator;
071import org.apache.hadoop.io.WritableUtils;
072import org.apache.hadoop.mapreduce.InputFormat;
073import org.apache.hadoop.mapreduce.InputSplit;
074import org.apache.hadoop.mapreduce.Job;
075import org.apache.hadoop.mapreduce.JobContext;
076import org.apache.hadoop.mapreduce.Mapper;
077import org.apache.hadoop.mapreduce.Partitioner;
078import org.apache.hadoop.mapreduce.RecordReader;
079import org.apache.hadoop.mapreduce.Reducer;
080import org.apache.hadoop.mapreduce.TaskAttemptContext;
081import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
082import org.apache.hadoop.util.StringUtils;
083import org.apache.hadoop.util.ToolRunner;
084import org.junit.Test;
085import org.junit.experimental.categories.Category;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
090import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
091import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
092
093/**
094 * Test Bulk Load and MR on a distributed cluster.
095 * It starts an MR job that creates linked chains
096 *
097 * The format of rows is like this:
098 * Row Key -> Long
099 *
100 * L:<< Chain Id >> -> Row Key of the next link in the chain
101 * S:<< Chain Id >> -> The step in the chain that his link is.
102 * D:<< Chain Id >> -> Random Data.
103 *
104 * All chains start on row 0.
105 * All rk's are > 0.
106 *
107 * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
108 *
109 * There are a few options exposed:
110 *
111 * hbase.IntegrationTestBulkLoad.chainLength
112 * The number of rows that will be part of each and every chain.
113 *
114 * hbase.IntegrationTestBulkLoad.numMaps
115 * The number of mappers that will be run.  Each mapper creates on linked list chain.
116 *
117 * hbase.IntegrationTestBulkLoad.numImportRounds
118 * How many jobs will be run to create linked lists.
119 *
120 * hbase.IntegrationTestBulkLoad.tableName
121 * The name of the table.
122 *
123 * hbase.IntegrationTestBulkLoad.replicaCount
124 * How many region replicas to configure for the table under test.
125 */
126@Category(IntegrationTests.class)
127public class IntegrationTestBulkLoad extends IntegrationTestBase {
128
129  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBulkLoad.class);
130
131  private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
132  private static final byte[] SORT_FAM  = Bytes.toBytes("S");
133  private static final byte[] DATA_FAM  = Bytes.toBytes("D");
134
135  private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
136  private static int CHAIN_LENGTH = 500000;
137
138  private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
139  private static int NUM_MAPS = 1;
140
141  private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
142  private static int NUM_IMPORT_ROUNDS = 1;
143
144  private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
145
146  private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
147  private static String TABLE_NAME = "IntegrationTestBulkLoad";
148
149  private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
150  private static int NUM_REPLICA_COUNT_DEFAULT = 1;
151
152  private static final String OPT_LOAD = "load";
153  private static final String OPT_CHECK = "check";
154
155  private boolean load = false;
156  private boolean check = false;
157
158  public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver {
159    static final AtomicLong sleepTime = new AtomicLong(2000);
160    Random r = new Random();
161    AtomicLong countOfNext = new AtomicLong(0);
162    AtomicLong countOfOpen = new AtomicLong(0);
163    public SlowMeCoproScanOperations() {}
164
165    @Override
166    public Optional<RegionObserver> getRegionObserver() {
167      return Optional.of(this);
168    }
169
170    @Override
171    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
172        final Scan scan) throws IOException {
173      if (countOfOpen.incrementAndGet() == 2) { //slowdown openScanner randomly
174        slowdownCode(e);
175      }
176    }
177
178    @Override
179    public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
180        final InternalScanner s, final List<Result> results,
181        final int limit, final boolean hasMore) throws IOException {
182      //this will slow down a certain next operation if the conditions are met. The slowness
183      //will allow the call to go to a replica
184      countOfNext.incrementAndGet();
185      if (countOfNext.get() == 0 || countOfNext.get() == 4) {
186        slowdownCode(e);
187      }
188      return true;
189    }
190    protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
191      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
192        try {
193          if (sleepTime.get() > 0) {
194            LOG.info("Sleeping for " + sleepTime.get() + " ms");
195            Thread.sleep(sleepTime.get());
196          }
197        } catch (InterruptedException e1) {
198          LOG.error(e1.toString(), e1);
199        }
200      }
201    }
202  }
203
204  /**
205   * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}.
206   */
207  private void installSlowingCoproc() throws IOException, InterruptedException {
208    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
209    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
210
211    TableName t = getTablename();
212    Admin admin = util.getAdmin();
213    TableDescriptor desc = admin.getDescriptor(t);
214    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc);
215    builder.setCoprocessor(SlowMeCoproScanOperations.class.getName());
216    HBaseTestingUtility.modifyTableSync(admin, builder.build());
217  }
218
219  @Test
220  public void testBulkLoad() throws Exception {
221    runLoad();
222    installSlowingCoproc();
223    runCheckWithRetry();
224  }
225
226  public void runLoad() throws Exception {
227    setupTable();
228    int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
229    LOG.info("Running load with numIterations:" + numImportRounds);
230    for (int i = 0; i < numImportRounds; i++) {
231      runLinkedListMRJob(i);
232    }
233  }
234
235  private byte[][] getSplits(int numRegions) {
236    RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
237    split.setFirstRow(Bytes.toBytes(0L));
238    split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
239    return split.split(numRegions);
240  }
241
242  private void setupTable() throws IOException, InterruptedException {
243    if (util.getAdmin().tableExists(getTablename())) {
244      util.deleteTable(getTablename());
245    }
246
247    util.createTable(
248        getTablename(),
249        new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
250        getSplits(16)
251    );
252
253    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
254    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
255
256    TableName t = getTablename();
257    HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount);
258  }
259
260  private void runLinkedListMRJob(int iteration) throws Exception {
261    String jobName =  IntegrationTestBulkLoad.class.getSimpleName() + " - " +
262        EnvironmentEdgeManager.currentTime();
263    Configuration conf = new Configuration(util.getConfiguration());
264    Path p = null;
265    if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
266      p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
267    } else {
268      p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY));
269    }
270
271    conf.setBoolean("mapreduce.map.speculative", false);
272    conf.setBoolean("mapreduce.reduce.speculative", false);
273    conf.setInt(ROUND_NUM_KEY, iteration);
274
275    Job job = new Job(conf);
276
277    job.setJobName(jobName);
278
279    // set the input format so that we can create map tasks with no data input.
280    job.setInputFormatClass(ITBulkLoadInputFormat.class);
281
282    // Set the mapper classes.
283    job.setMapperClass(LinkedListCreationMapper.class);
284    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
285    job.setMapOutputValueClass(KeyValue.class);
286
287    // Use the identity reducer
288    // So nothing to do here.
289
290    // Set this jar.
291    job.setJarByClass(getClass());
292
293    // Set where to place the hfiles.
294    FileOutputFormat.setOutputPath(job, p);
295    try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin();
296      RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
297      // Configure the partitioner and other things needed for HFileOutputFormat.
298      HFileOutputFormat2.configureIncrementalLoad(job, admin.getDescriptor(getTablename()),
299        regionLocator);
300      // Run the job making sure it works.
301      assertEquals(true, job.waitForCompletion(true));
302    }
303    // Create a new loader.
304    BulkLoadHFiles loader = BulkLoadHFiles.create(conf);
305    // Load the HFiles in.
306    loader.bulkLoad(getTablename(), p);
307    // Delete the files.
308    util.getTestFileSystem().delete(p, true);
309  }
310
311  public static class EmptySplit extends InputSplit implements Writable {
312    @Override
313    public void write(DataOutput out) throws IOException { }
314    @Override
315    public void readFields(DataInput in) throws IOException { }
316    @Override
317    public long getLength() { return 0L; }
318    @Override
319    public String[] getLocations() { return new String[0]; }
320  }
321
322  public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
323    private int index = -1;
324    private K[] keys;
325    private V[] values;
326
327    public FixedRecordReader(K[] keys, V[] values) {
328      this.keys = keys;
329      this.values = values;
330    }
331    @Override
332    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
333    InterruptedException { }
334    @Override
335    public boolean nextKeyValue() throws IOException, InterruptedException {
336      return ++index < keys.length;
337    }
338    @Override
339    public K getCurrentKey() throws IOException, InterruptedException {
340      return keys[index];
341    }
342    @Override
343    public V getCurrentValue() throws IOException, InterruptedException {
344      return values[index];
345    }
346    @Override
347    public float getProgress() throws IOException, InterruptedException {
348      return (float)index / keys.length;
349    }
350    @Override
351    public void close() throws IOException {
352    }
353  }
354
355  public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
356    @Override
357    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
358      int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
359      ArrayList<InputSplit> ret = new ArrayList<>(numSplits);
360      for (int i = 0; i < numSplits; ++i) {
361        ret.add(new EmptySplit());
362      }
363      return ret;
364    }
365
366    @Override
367    public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
368      TaskAttemptContext context)
369          throws IOException, InterruptedException {
370      int taskId = context.getTaskAttemptID().getTaskID().getId();
371      int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
372      int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
373      int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
374
375      taskId = taskId + iteration * numMapTasks;
376      numMapTasks = numMapTasks * numIterations;
377
378      long chainId = Math.abs(new Random().nextLong());
379      chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
380      LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
381
382      return new FixedRecordReader<>(keys, keys);
383    }
384  }
385
386  /**
387   * Mapper that creates a linked list of KeyValues.
388   *
389   * Each map task generates one linked list.
390   * All lists start on row key 0L.
391   * All lists should be CHAIN_LENGTH long.
392   */
393  public static class LinkedListCreationMapper
394      extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
395
396    private Random rand = new Random();
397
398    @Override
399    protected void map(LongWritable key, LongWritable value, Context context)
400        throws IOException, InterruptedException {
401      long chainId = value.get();
402      LOG.info("Starting mapper with chainId:" + chainId);
403
404      byte[] chainIdArray = Bytes.toBytes(chainId);
405      long currentRow = 0;
406
407      long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
408      long nextRow = getNextRow(0, chainLength);
409
410      for (long i = 0; i < chainLength; i++) {
411        byte[] rk = Bytes.toBytes(currentRow);
412
413        // Next link in the chain.
414        KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
415        // What link in the chain this is.
416        KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
417        // Added data so that large stores are created.
418        KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
419          Bytes.toBytes(RandomStringUtils.randomAlphabetic(50))
420        );
421
422        // Emit the key values.
423        context.write(new ImmutableBytesWritable(rk), linkKv);
424        context.write(new ImmutableBytesWritable(rk), sortKv);
425        context.write(new ImmutableBytesWritable(rk), dataKv);
426        // Move to the next row.
427        currentRow = nextRow;
428        nextRow = getNextRow(i+1, chainLength);
429      }
430    }
431
432    /** Returns a unique row id within this chain for this index */
433    private long getNextRow(long index, long chainLength) {
434      long nextRow = Math.abs(rand.nextLong());
435      // use significant bits from the random number, but pad with index to ensure it is unique
436      // this also ensures that we do not reuse row = 0
437      // row collisions from multiple mappers are fine, since we guarantee unique chainIds
438      nextRow = nextRow - (nextRow % chainLength) + index;
439      return nextRow;
440    }
441  }
442
443  /**
444   * Writable class used as the key to group links in the linked list.
445   *
446   * Used as the key emited from a pass over the table.
447   */
448  public static class LinkKey implements WritableComparable<LinkKey> {
449
450    private Long chainId;
451
452    public Long getOrder() {
453      return order;
454    }
455
456    public Long getChainId() {
457      return chainId;
458    }
459
460    private Long order;
461
462    public LinkKey() {}
463
464    public LinkKey(long chainId, long order) {
465      this.chainId = chainId;
466      this.order = order;
467    }
468
469    @Override
470    public int compareTo(LinkKey linkKey) {
471      int res = getChainId().compareTo(linkKey.getChainId());
472      if (res == 0) {
473        res = getOrder().compareTo(linkKey.getOrder());
474      }
475      return res;
476    }
477
478    @Override
479    public void write(DataOutput dataOutput) throws IOException {
480      WritableUtils.writeVLong(dataOutput, chainId);
481      WritableUtils.writeVLong(dataOutput, order);
482    }
483
484    @Override
485    public void readFields(DataInput dataInput) throws IOException {
486      chainId = WritableUtils.readVLong(dataInput);
487      order = WritableUtils.readVLong(dataInput);
488    }
489  }
490
491  /**
492   * Writable used as the value emitted from a pass over the hbase table.
493   */
494  public static class LinkChain implements WritableComparable<LinkChain> {
495
496    public Long getNext() {
497      return next;
498    }
499
500    public Long getRk() {
501      return rk;
502    }
503
504    public LinkChain() {}
505
506    public LinkChain(Long rk, Long next) {
507      this.rk = rk;
508      this.next = next;
509    }
510
511    private Long rk;
512    private Long next;
513
514    @Override
515    public int compareTo(LinkChain linkChain) {
516      int res = getRk().compareTo(linkChain.getRk());
517      if (res == 0) {
518        res = getNext().compareTo(linkChain.getNext());
519      }
520      return res;
521    }
522
523    @Override
524    public void write(DataOutput dataOutput) throws IOException {
525      WritableUtils.writeVLong(dataOutput, rk);
526      WritableUtils.writeVLong(dataOutput, next);
527    }
528
529    @Override
530    public void readFields(DataInput dataInput) throws IOException {
531      rk = WritableUtils.readVLong(dataInput);
532      next = WritableUtils.readVLong(dataInput);
533    }
534  }
535
536  /**
537   * Class to figure out what partition to send a link in the chain to.  This is based upon
538   * the linkKey's ChainId.
539   */
540  public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
541    @Override
542    public int getPartition(LinkKey linkKey,
543                            LinkChain linkChain,
544                            int numPartitions) {
545      int hash = linkKey.getChainId().hashCode();
546      return Math.abs(hash % numPartitions);
547    }
548  }
549
550  /**
551   * Comparator used to figure out if a linkKey should be grouped together.  This is based upon the
552   * linkKey's ChainId.
553   */
554  public static class NaturalKeyGroupingComparator extends WritableComparator {
555
556    protected NaturalKeyGroupingComparator() {
557      super(LinkKey.class, true);
558    }
559
560    @Override
561    public int compare(WritableComparable w1, WritableComparable w2) {
562      LinkKey k1 = (LinkKey) w1;
563      LinkKey k2 = (LinkKey) w2;
564
565      return k1.getChainId().compareTo(k2.getChainId());
566    }
567  }
568
569  /**
570   * Comparator used to order linkKeys so that they are passed to a reducer in order.  This is based
571   * upon linkKey ChainId and Order.
572   */
573  public static class CompositeKeyComparator extends WritableComparator {
574
575    protected CompositeKeyComparator() {
576      super(LinkKey.class, true);
577    }
578
579    @Override
580    public int compare(WritableComparable w1, WritableComparable w2) {
581      LinkKey k1 = (LinkKey) w1;
582      LinkKey k2 = (LinkKey) w2;
583
584      return k1.compareTo(k2);
585    }
586  }
587
588  /**
589   * Mapper to pass over the table.
590   *
591   * For every row there could be multiple chains that landed on this row. So emit a linkKey
592   * and value for each.
593   */
594  public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
595    @Override
596    protected void map(ImmutableBytesWritable key, Result value, Context context)
597        throws IOException, InterruptedException {
598      long longRk = Bytes.toLong(value.getRow());
599
600      for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
601        long chainId = Bytes.toLong(entry.getKey());
602        long next = Bytes.toLong(entry.getValue());
603        Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
604        long order = Bytes.toLong(CellUtil.cloneValue(c));
605        context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
606      }
607    }
608  }
609
610  /**
611   * Class that does the actual checking of the links.
612   *
613   * All links in the chain should be grouped and sorted when sent to this class.  Then the chain
614   * will be traversed making sure that no link is missing and that the chain is the correct length.
615   *
616   * This will throw an exception if anything is not correct.  That causes the job to fail if any
617   * data is corrupt.
618   */
619  public static class LinkedListCheckingReducer
620      extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
621    @Override
622    protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
623        throws java.io.IOException, java.lang.InterruptedException {
624      long next = -1L;
625      long prev = -1L;
626      long count = 0L;
627
628      for (LinkChain lc : values) {
629
630        if (next == -1) {
631          if (lc.getRk() != 0L) {
632            String msg = "Chains should all start at rk 0, but read rk " + lc.getRk()
633                + ". Chain:" + key.chainId + ", order:" + key.order;
634            logError(msg, context);
635            throw new RuntimeException(msg);
636          }
637          next = lc.getNext();
638        } else {
639          if (next != lc.getRk()) {
640            String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting "
641                + next + " but got " + lc.getRk() + ". Chain:" + key.chainId
642                + ", order:" + key.order;
643            logError(msg, context);
644            throw new RuntimeException(msg);
645          }
646          prev = lc.getRk();
647          next = lc.getNext();
648        }
649        count++;
650      }
651
652      int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
653      if (count != expectedChainLen) {
654        String msg = "Chain wasn't the correct length.  Expected " + expectedChainLen + " got "
655            + count + ". Chain:" + key.chainId + ", order:" + key.order;
656        logError(msg, context);
657        throw new RuntimeException(msg);
658      }
659    }
660
661    private static void logError(String msg, Context context) throws IOException {
662      TableName table = getTableName(context.getConfiguration());
663
664      LOG.error("Failure in chain verification: " + msg);
665      try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
666          Admin admin = connection.getAdmin()) {
667        LOG.error("cluster metrics:\n" + admin.getClusterMetrics());
668        LOG.error("table regions:\n"
669            + Joiner.on("\n").join(admin.getRegions(table)));
670      }
671    }
672  }
673
674  private void runCheckWithRetry() throws IOException, ClassNotFoundException, InterruptedException {
675    try {
676      runCheck();
677    } catch (Throwable t) {
678      LOG.warn("Received " + StringUtils.stringifyException(t));
679      LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not");
680      runCheck();
681      throw t; // we should still fail the test even if second retry succeeds
682    }
683    // everything green
684  }
685
686
687  /**
688   * After adding data to the table start a mr job to
689   * @throws IOException
690   * @throws ClassNotFoundException
691   * @throws InterruptedException
692   */
693  private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
694    LOG.info("Running check");
695    Configuration conf = getConf();
696    String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
697    Path p = util.getDataTestDirOnTestFS(jobName);
698
699    Job job = new Job(conf);
700    job.setJarByClass(getClass());
701    job.setJobName(jobName);
702
703    job.setPartitionerClass(NaturalKeyPartitioner.class);
704    job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
705    job.setSortComparatorClass(CompositeKeyComparator.class);
706
707    Scan scan = new Scan();
708    scan.addFamily(CHAIN_FAM);
709    scan.addFamily(SORT_FAM);
710    scan.setMaxVersions(1);
711    scan.setCacheBlocks(false);
712    scan.setBatch(1000);
713
714    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
715    if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
716      scan.setConsistency(Consistency.TIMELINE);
717    }
718
719    TableMapReduceUtil.initTableMapperJob(
720        getTablename().getName(),
721        scan,
722        LinkedListCheckingMapper.class,
723        LinkKey.class,
724        LinkChain.class,
725        job
726    );
727
728    job.setReducerClass(LinkedListCheckingReducer.class);
729    job.setOutputKeyClass(NullWritable.class);
730    job.setOutputValueClass(NullWritable.class);
731
732    FileOutputFormat.setOutputPath(job, p);
733
734    assertEquals(true, job.waitForCompletion(true));
735
736    // Delete the files.
737    util.getTestFileSystem().delete(p, true);
738  }
739
740  @Override
741  public void setUpCluster() throws Exception {
742    util = getTestingUtil(getConf());
743    util.initializeCluster(1);
744    int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
745    if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
746      LOG.debug("Region Replicas enabled: " + replicaCount);
747    }
748
749    // Scale this up on a real cluster
750    if (util.isDistributedCluster()) {
751      util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
752          Integer.toString(util.getAdmin()
753                               .getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
754                               .getLiveServerMetrics().size() * 10)
755      );
756      util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
757    } else {
758      util.startMiniMapReduceCluster();
759    }
760  }
761
762  @Override
763  protected void addOptions() {
764    super.addOptions();
765    super.addOptNoArg(OPT_CHECK, "Run check only");
766    super.addOptNoArg(OPT_LOAD, "Run load only");
767  }
768
769  @Override
770  protected void processOptions(CommandLine cmd) {
771    super.processOptions(cmd);
772    check = cmd.hasOption(OPT_CHECK);
773    load = cmd.hasOption(OPT_LOAD);
774  }
775
776  @Override
777  public int runTestFromCommandLine() throws Exception {
778    if (load) {
779      runLoad();
780    } else if (check) {
781      installSlowingCoproc();
782      runCheckWithRetry();
783    } else {
784      testBulkLoad();
785    }
786    return 0;
787  }
788
789  @Override
790  public TableName getTablename() {
791    return getTableName(getConf());
792  }
793
794  public static TableName getTableName(Configuration conf) {
795    return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME));
796  }
797
798  @Override
799  protected Set<String> getColumnFamilies() {
800    return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM),
801        Bytes.toString(SORT_FAM));
802  }
803
804  public static void main(String[] args) throws Exception {
805    Configuration conf = HBaseConfiguration.create();
806    IntegrationTestingUtility.setUseDistributedCluster(conf);
807    int status =  ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
808    System.exit(status);
809  }
810}