001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import static org.junit.Assert.assertEquals;
022
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Random;
031import java.util.Set;
032import java.util.concurrent.atomic.AtomicLong;
033import org.apache.commons.lang3.RandomStringUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.IntegrationTestBase;
041import org.apache.hadoop.hbase.IntegrationTestingUtility;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.Admin;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.client.ConnectionFactory;
047import org.apache.hadoop.hbase.client.Consistency;
048import org.apache.hadoop.hbase.client.RegionLocator;
049import org.apache.hadoop.hbase.client.Result;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.Table;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.coprocessor.ObserverContext;
055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
057import org.apache.hadoop.hbase.coprocessor.RegionObserver;
058import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
059import org.apache.hadoop.hbase.regionserver.InternalScanner;
060import org.apache.hadoop.hbase.testclassification.IntegrationTests;
061import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
064import org.apache.hadoop.hbase.util.RegionSplitter;
065import org.apache.hadoop.io.LongWritable;
066import org.apache.hadoop.io.NullWritable;
067import org.apache.hadoop.io.Writable;
068import org.apache.hadoop.io.WritableComparable;
069import org.apache.hadoop.io.WritableComparator;
070import org.apache.hadoop.io.WritableUtils;
071import org.apache.hadoop.mapreduce.InputFormat;
072import org.apache.hadoop.mapreduce.InputSplit;
073import org.apache.hadoop.mapreduce.Job;
074import org.apache.hadoop.mapreduce.JobContext;
075import org.apache.hadoop.mapreduce.Mapper;
076import org.apache.hadoop.mapreduce.Partitioner;
077import org.apache.hadoop.mapreduce.RecordReader;
078import org.apache.hadoop.mapreduce.Reducer;
079import org.apache.hadoop.mapreduce.TaskAttemptContext;
080import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
081import org.apache.hadoop.util.StringUtils;
082import org.apache.hadoop.util.ToolRunner;
083import org.junit.Test;
084import org.junit.experimental.categories.Category;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
088import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
089import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
090
091/**
092 * Test Bulk Load and MR on a distributed cluster.
093 * It starts an MR job that creates linked chains
094 *
095 * The format of rows is like this:
096 * Row Key -> Long
097 *
098 * L:<< Chain Id >> -> Row Key of the next link in the chain
099 * S:<< Chain Id >> -> The step in the chain that his link is.
100 * D:<< Chain Id >> -> Random Data.
101 *
102 * All chains start on row 0.
103 * All rk's are > 0.
104 *
105 * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
106 *
107 * There are a few options exposed:
108 *
109 * hbase.IntegrationTestBulkLoad.chainLength
110 * The number of rows that will be part of each and every chain.
111 *
112 * hbase.IntegrationTestBulkLoad.numMaps
113 * The number of mappers that will be run.  Each mapper creates on linked list chain.
114 *
115 * hbase.IntegrationTestBulkLoad.numImportRounds
116 * How many jobs will be run to create linked lists.
117 *
118 * hbase.IntegrationTestBulkLoad.tableName
119 * The name of the table.
120 *
121 * hbase.IntegrationTestBulkLoad.replicaCount
122 * How many region replicas to configure for the table under test.
123 */
124@Category(IntegrationTests.class)
125public class IntegrationTestBulkLoad extends IntegrationTestBase {
126
127  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBulkLoad.class);
128
129  private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
130  private static final byte[] SORT_FAM  = Bytes.toBytes("S");
131  private static final byte[] DATA_FAM  = Bytes.toBytes("D");
132
133  private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
134  private static int CHAIN_LENGTH = 500000;
135
136  private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
137  private static int NUM_MAPS = 1;
138
139  private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
140  private static int NUM_IMPORT_ROUNDS = 1;
141
142  private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
143
144  private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
145  private static String TABLE_NAME = "IntegrationTestBulkLoad";
146
147  private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
148  private static int NUM_REPLICA_COUNT_DEFAULT = 1;
149
150  private static final String OPT_LOAD = "load";
151  private static final String OPT_CHECK = "check";
152
153  private boolean load = false;
154  private boolean check = false;
155
156  public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver {
157    static final AtomicLong sleepTime = new AtomicLong(2000);
158    Random r = new Random();
159    AtomicLong countOfNext = new AtomicLong(0);
160    AtomicLong countOfOpen = new AtomicLong(0);
161    public SlowMeCoproScanOperations() {}
162
163    @Override
164    public Optional<RegionObserver> getRegionObserver() {
165      return Optional.of(this);
166    }
167
168    @Override
169    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
170        final Scan scan) throws IOException {
171      if (countOfOpen.incrementAndGet() == 2) { //slowdown openScanner randomly
172        slowdownCode(e);
173      }
174    }
175
176    @Override
177    public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
178        final InternalScanner s, final List<Result> results,
179        final int limit, final boolean hasMore) throws IOException {
180      //this will slow down a certain next operation if the conditions are met. The slowness
181      //will allow the call to go to a replica
182      countOfNext.incrementAndGet();
183      if (countOfNext.get() == 0 || countOfNext.get() == 4) {
184        slowdownCode(e);
185      }
186      return true;
187    }
188    protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
189      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
190        try {
191          if (sleepTime.get() > 0) {
192            LOG.info("Sleeping for " + sleepTime.get() + " ms");
193            Thread.sleep(sleepTime.get());
194          }
195        } catch (InterruptedException e1) {
196          LOG.error(e1.toString(), e1);
197        }
198      }
199    }
200  }
201
202  /**
203   * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}.
204   */
205  private void installSlowingCoproc() throws IOException, InterruptedException {
206    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
207    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
208
209    TableName t = getTablename();
210    Admin admin = util.getAdmin();
211    TableDescriptor desc = admin.getDescriptor(t);
212    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc);
213    builder.setCoprocessor(SlowMeCoproScanOperations.class.getName());
214    HBaseTestingUtility.modifyTableSync(admin, builder.build());
215  }
216
217  @Test
218  public void testBulkLoad() throws Exception {
219    runLoad();
220    installSlowingCoproc();
221    runCheckWithRetry();
222  }
223
224  public void runLoad() throws Exception {
225    setupTable();
226    int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
227    LOG.info("Running load with numIterations:" + numImportRounds);
228    for (int i = 0; i < numImportRounds; i++) {
229      runLinkedListMRJob(i);
230    }
231  }
232
233  private byte[][] getSplits(int numRegions) {
234    RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
235    split.setFirstRow(Bytes.toBytes(0L));
236    split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
237    return split.split(numRegions);
238  }
239
240  private void setupTable() throws IOException, InterruptedException {
241    if (util.getAdmin().tableExists(getTablename())) {
242      util.deleteTable(getTablename());
243    }
244
245    util.createTable(
246        getTablename(),
247        new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
248        getSplits(16)
249    );
250
251    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
252    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
253
254    TableName t = getTablename();
255    HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount);
256  }
257
258  private void runLinkedListMRJob(int iteration) throws Exception {
259    String jobName =  IntegrationTestBulkLoad.class.getSimpleName() + " - " +
260        EnvironmentEdgeManager.currentTime();
261    Configuration conf = new Configuration(util.getConfiguration());
262    Path p = null;
263    if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
264      p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
265    } else {
266      p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY));
267    }
268
269    conf.setBoolean("mapreduce.map.speculative", false);
270    conf.setBoolean("mapreduce.reduce.speculative", false);
271    conf.setInt(ROUND_NUM_KEY, iteration);
272
273    Job job = new Job(conf);
274
275    job.setJobName(jobName);
276
277    // set the input format so that we can create map tasks with no data input.
278    job.setInputFormatClass(ITBulkLoadInputFormat.class);
279
280    // Set the mapper classes.
281    job.setMapperClass(LinkedListCreationMapper.class);
282    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
283    job.setMapOutputValueClass(KeyValue.class);
284
285    // Use the identity reducer
286    // So nothing to do here.
287
288    // Set this jar.
289    job.setJarByClass(getClass());
290
291    // Set where to place the hfiles.
292    FileOutputFormat.setOutputPath(job, p);
293    try (Connection conn = ConnectionFactory.createConnection(conf);
294        Admin admin = conn.getAdmin();
295        Table table = conn.getTable(getTablename());
296        RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
297
298      // Configure the partitioner and other things needed for HFileOutputFormat.
299      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
300
301      // Run the job making sure it works.
302      assertEquals(true, job.waitForCompletion(true));
303
304      // Create a new loader.
305      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
306
307      // Load the HFiles in.
308      loader.doBulkLoad(p, admin, table, regionLocator);
309    }
310
311    // Delete the files.
312    util.getTestFileSystem().delete(p, true);
313  }
314
315  public static class EmptySplit extends InputSplit implements Writable {
316    @Override
317    public void write(DataOutput out) throws IOException { }
318    @Override
319    public void readFields(DataInput in) throws IOException { }
320    @Override
321    public long getLength() { return 0L; }
322    @Override
323    public String[] getLocations() { return new String[0]; }
324  }
325
326  public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
327    private int index = -1;
328    private K[] keys;
329    private V[] values;
330
331    public FixedRecordReader(K[] keys, V[] values) {
332      this.keys = keys;
333      this.values = values;
334    }
335    @Override
336    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
337    InterruptedException { }
338    @Override
339    public boolean nextKeyValue() throws IOException, InterruptedException {
340      return ++index < keys.length;
341    }
342    @Override
343    public K getCurrentKey() throws IOException, InterruptedException {
344      return keys[index];
345    }
346    @Override
347    public V getCurrentValue() throws IOException, InterruptedException {
348      return values[index];
349    }
350    @Override
351    public float getProgress() throws IOException, InterruptedException {
352      return (float)index / keys.length;
353    }
354    @Override
355    public void close() throws IOException {
356    }
357  }
358
359  public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
360    @Override
361    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
362      int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
363      ArrayList<InputSplit> ret = new ArrayList<>(numSplits);
364      for (int i = 0; i < numSplits; ++i) {
365        ret.add(new EmptySplit());
366      }
367      return ret;
368    }
369
370    @Override
371    public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
372      TaskAttemptContext context)
373          throws IOException, InterruptedException {
374      int taskId = context.getTaskAttemptID().getTaskID().getId();
375      int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
376      int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
377      int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
378
379      taskId = taskId + iteration * numMapTasks;
380      numMapTasks = numMapTasks * numIterations;
381
382      long chainId = Math.abs(new Random().nextLong());
383      chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
384      LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
385
386      return new FixedRecordReader<>(keys, keys);
387    }
388  }
389
390  /**
391   * Mapper that creates a linked list of KeyValues.
392   *
393   * Each map task generates one linked list.
394   * All lists start on row key 0L.
395   * All lists should be CHAIN_LENGTH long.
396   */
397  public static class LinkedListCreationMapper
398      extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
399
400    private Random rand = new Random();
401
402    @Override
403    protected void map(LongWritable key, LongWritable value, Context context)
404        throws IOException, InterruptedException {
405      long chainId = value.get();
406      LOG.info("Starting mapper with chainId:" + chainId);
407
408      byte[] chainIdArray = Bytes.toBytes(chainId);
409      long currentRow = 0;
410
411      long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
412      long nextRow = getNextRow(0, chainLength);
413
414      for (long i = 0; i < chainLength; i++) {
415        byte[] rk = Bytes.toBytes(currentRow);
416
417        // Next link in the chain.
418        KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
419        // What link in the chain this is.
420        KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
421        // Added data so that large stores are created.
422        KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
423          Bytes.toBytes(RandomStringUtils.randomAlphabetic(50))
424        );
425
426        // Emit the key values.
427        context.write(new ImmutableBytesWritable(rk), linkKv);
428        context.write(new ImmutableBytesWritable(rk), sortKv);
429        context.write(new ImmutableBytesWritable(rk), dataKv);
430        // Move to the next row.
431        currentRow = nextRow;
432        nextRow = getNextRow(i+1, chainLength);
433      }
434    }
435
436    /** Returns a unique row id within this chain for this index */
437    private long getNextRow(long index, long chainLength) {
438      long nextRow = Math.abs(rand.nextLong());
439      // use significant bits from the random number, but pad with index to ensure it is unique
440      // this also ensures that we do not reuse row = 0
441      // row collisions from multiple mappers are fine, since we guarantee unique chainIds
442      nextRow = nextRow - (nextRow % chainLength) + index;
443      return nextRow;
444    }
445  }
446
447  /**
448   * Writable class used as the key to group links in the linked list.
449   *
450   * Used as the key emited from a pass over the table.
451   */
452  public static class LinkKey implements WritableComparable<LinkKey> {
453
454    private Long chainId;
455
456    public Long getOrder() {
457      return order;
458    }
459
460    public Long getChainId() {
461      return chainId;
462    }
463
464    private Long order;
465
466    public LinkKey() {}
467
468    public LinkKey(long chainId, long order) {
469      this.chainId = chainId;
470      this.order = order;
471    }
472
473    @Override
474    public int compareTo(LinkKey linkKey) {
475      int res = getChainId().compareTo(linkKey.getChainId());
476      if (res == 0) {
477        res = getOrder().compareTo(linkKey.getOrder());
478      }
479      return res;
480    }
481
482    @Override
483    public void write(DataOutput dataOutput) throws IOException {
484      WritableUtils.writeVLong(dataOutput, chainId);
485      WritableUtils.writeVLong(dataOutput, order);
486    }
487
488    @Override
489    public void readFields(DataInput dataInput) throws IOException {
490      chainId = WritableUtils.readVLong(dataInput);
491      order = WritableUtils.readVLong(dataInput);
492    }
493  }
494
495  /**
496   * Writable used as the value emitted from a pass over the hbase table.
497   */
498  public static class LinkChain implements WritableComparable<LinkChain> {
499
500    public Long getNext() {
501      return next;
502    }
503
504    public Long getRk() {
505      return rk;
506    }
507
508    public LinkChain() {}
509
510    public LinkChain(Long rk, Long next) {
511      this.rk = rk;
512      this.next = next;
513    }
514
515    private Long rk;
516    private Long next;
517
518    @Override
519    public int compareTo(LinkChain linkChain) {
520      int res = getRk().compareTo(linkChain.getRk());
521      if (res == 0) {
522        res = getNext().compareTo(linkChain.getNext());
523      }
524      return res;
525    }
526
527    @Override
528    public void write(DataOutput dataOutput) throws IOException {
529      WritableUtils.writeVLong(dataOutput, rk);
530      WritableUtils.writeVLong(dataOutput, next);
531    }
532
533    @Override
534    public void readFields(DataInput dataInput) throws IOException {
535      rk = WritableUtils.readVLong(dataInput);
536      next = WritableUtils.readVLong(dataInput);
537    }
538  }
539
540  /**
541   * Class to figure out what partition to send a link in the chain to.  This is based upon
542   * the linkKey's ChainId.
543   */
544  public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
545    @Override
546    public int getPartition(LinkKey linkKey,
547                            LinkChain linkChain,
548                            int numPartitions) {
549      int hash = linkKey.getChainId().hashCode();
550      return Math.abs(hash % numPartitions);
551    }
552  }
553
554  /**
555   * Comparator used to figure out if a linkKey should be grouped together.  This is based upon the
556   * linkKey's ChainId.
557   */
558  public static class NaturalKeyGroupingComparator extends WritableComparator {
559
560    protected NaturalKeyGroupingComparator() {
561      super(LinkKey.class, true);
562    }
563
564    @Override
565    public int compare(WritableComparable w1, WritableComparable w2) {
566      LinkKey k1 = (LinkKey) w1;
567      LinkKey k2 = (LinkKey) w2;
568
569      return k1.getChainId().compareTo(k2.getChainId());
570    }
571  }
572
573  /**
574   * Comparator used to order linkKeys so that they are passed to a reducer in order.  This is based
575   * upon linkKey ChainId and Order.
576   */
577  public static class CompositeKeyComparator extends WritableComparator {
578
579    protected CompositeKeyComparator() {
580      super(LinkKey.class, true);
581    }
582
583    @Override
584    public int compare(WritableComparable w1, WritableComparable w2) {
585      LinkKey k1 = (LinkKey) w1;
586      LinkKey k2 = (LinkKey) w2;
587
588      return k1.compareTo(k2);
589    }
590  }
591
592  /**
593   * Mapper to pass over the table.
594   *
595   * For every row there could be multiple chains that landed on this row. So emit a linkKey
596   * and value for each.
597   */
598  public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
599    @Override
600    protected void map(ImmutableBytesWritable key, Result value, Context context)
601        throws IOException, InterruptedException {
602      long longRk = Bytes.toLong(value.getRow());
603
604      for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
605        long chainId = Bytes.toLong(entry.getKey());
606        long next = Bytes.toLong(entry.getValue());
607        Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
608        long order = Bytes.toLong(CellUtil.cloneValue(c));
609        context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
610      }
611    }
612  }
613
614  /**
615   * Class that does the actual checking of the links.
616   *
617   * All links in the chain should be grouped and sorted when sent to this class.  Then the chain
618   * will be traversed making sure that no link is missing and that the chain is the correct length.
619   *
620   * This will throw an exception if anything is not correct.  That causes the job to fail if any
621   * data is corrupt.
622   */
623  public static class LinkedListCheckingReducer
624      extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
625    @Override
626    protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
627        throws java.io.IOException, java.lang.InterruptedException {
628      long next = -1L;
629      long prev = -1L;
630      long count = 0L;
631
632      for (LinkChain lc : values) {
633
634        if (next == -1) {
635          if (lc.getRk() != 0L) {
636            String msg = "Chains should all start at rk 0, but read rk " + lc.getRk()
637                + ". Chain:" + key.chainId + ", order:" + key.order;
638            logError(msg, context);
639            throw new RuntimeException(msg);
640          }
641          next = lc.getNext();
642        } else {
643          if (next != lc.getRk()) {
644            String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting "
645                + next + " but got " + lc.getRk() + ". Chain:" + key.chainId
646                + ", order:" + key.order;
647            logError(msg, context);
648            throw new RuntimeException(msg);
649          }
650          prev = lc.getRk();
651          next = lc.getNext();
652        }
653        count++;
654      }
655
656      int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
657      if (count != expectedChainLen) {
658        String msg = "Chain wasn't the correct length.  Expected " + expectedChainLen + " got "
659            + count + ". Chain:" + key.chainId + ", order:" + key.order;
660        logError(msg, context);
661        throw new RuntimeException(msg);
662      }
663    }
664
665    private static void logError(String msg, Context context) throws IOException {
666      TableName table = getTableName(context.getConfiguration());
667
668      LOG.error("Failure in chain verification: " + msg);
669      try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
670          Admin admin = connection.getAdmin()) {
671        LOG.error("cluster status:\n" + admin.getClusterStatus());
672        LOG.error("table regions:\n"
673            + Joiner.on("\n").join(admin.getTableRegions(table)));
674      }
675    }
676  }
677
678  private void runCheckWithRetry() throws IOException, ClassNotFoundException, InterruptedException {
679    try {
680      runCheck();
681    } catch (Throwable t) {
682      LOG.warn("Received " + StringUtils.stringifyException(t));
683      LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not");
684      runCheck();
685      throw t; // we should still fail the test even if second retry succeeds
686    }
687    // everything green
688  }
689
690
691  /**
692   * After adding data to the table start a mr job to
693   * @throws IOException
694   * @throws ClassNotFoundException
695   * @throws InterruptedException
696   */
697  private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
698    LOG.info("Running check");
699    Configuration conf = getConf();
700    String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
701    Path p = util.getDataTestDirOnTestFS(jobName);
702
703    Job job = new Job(conf);
704    job.setJarByClass(getClass());
705    job.setJobName(jobName);
706
707    job.setPartitionerClass(NaturalKeyPartitioner.class);
708    job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
709    job.setSortComparatorClass(CompositeKeyComparator.class);
710
711    Scan scan = new Scan();
712    scan.addFamily(CHAIN_FAM);
713    scan.addFamily(SORT_FAM);
714    scan.setMaxVersions(1);
715    scan.setCacheBlocks(false);
716    scan.setBatch(1000);
717
718    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
719    if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
720      scan.setConsistency(Consistency.TIMELINE);
721    }
722
723    TableMapReduceUtil.initTableMapperJob(
724        getTablename().getName(),
725        scan,
726        LinkedListCheckingMapper.class,
727        LinkKey.class,
728        LinkChain.class,
729        job
730    );
731
732    job.setReducerClass(LinkedListCheckingReducer.class);
733    job.setOutputKeyClass(NullWritable.class);
734    job.setOutputValueClass(NullWritable.class);
735
736    FileOutputFormat.setOutputPath(job, p);
737
738    assertEquals(true, job.waitForCompletion(true));
739
740    // Delete the files.
741    util.getTestFileSystem().delete(p, true);
742  }
743
744  @Override
745  public void setUpCluster() throws Exception {
746    util = getTestingUtil(getConf());
747    util.initializeCluster(1);
748    int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
749    if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
750      LOG.debug("Region Replicas enabled: " + replicaCount);
751    }
752
753    // Scale this up on a real cluster
754    if (util.isDistributedCluster()) {
755      util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
756        Integer.toString(util.getAdmin().getRegionServers().size() * 10));
757      util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
758    } else {
759      util.startMiniMapReduceCluster();
760    }
761  }
762
763  @Override
764  protected void addOptions() {
765    super.addOptions();
766    super.addOptNoArg(OPT_CHECK, "Run check only");
767    super.addOptNoArg(OPT_LOAD, "Run load only");
768  }
769
770  @Override
771  protected void processOptions(CommandLine cmd) {
772    super.processOptions(cmd);
773    check = cmd.hasOption(OPT_CHECK);
774    load = cmd.hasOption(OPT_LOAD);
775  }
776
777  @Override
778  public int runTestFromCommandLine() throws Exception {
779    if (load) {
780      runLoad();
781    } else if (check) {
782      installSlowingCoproc();
783      runCheckWithRetry();
784    } else {
785      testBulkLoad();
786    }
787    return 0;
788  }
789
790  @Override
791  public TableName getTablename() {
792    return getTableName(getConf());
793  }
794
795  public static TableName getTableName(Configuration conf) {
796    return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME));
797  }
798
799  @Override
800  protected Set<String> getColumnFamilies() {
801    return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM),
802        Bytes.toString(SORT_FAM));
803  }
804
805  public static void main(String[] args) throws Exception {
806    Configuration conf = HBaseConfiguration.create();
807    IntegrationTestingUtility.setUseDistributedCluster(conf);
808    int status =  ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
809    System.exit(status);
810  }
811}