001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.DataInput;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.concurrent.atomic.AtomicLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.IntegrationTestBase;
039import org.apache.hadoop.hbase.IntegrationTestingUtility;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.ConnectionFactory;
045import org.apache.hadoop.hbase.client.Consistency;
046import org.apache.hadoop.hbase.client.RegionLocator;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.coprocessor.ObserverContext;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
055import org.apache.hadoop.hbase.coprocessor.RegionObserver;
056import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
057import org.apache.hadoop.hbase.regionserver.InternalScanner;
058import org.apache.hadoop.hbase.testclassification.IntegrationTests;
059import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.RegionSplitter;
063import org.apache.hadoop.io.LongWritable;
064import org.apache.hadoop.io.NullWritable;
065import org.apache.hadoop.io.Writable;
066import org.apache.hadoop.io.WritableComparable;
067import org.apache.hadoop.io.WritableComparator;
068import org.apache.hadoop.io.WritableUtils;
069import org.apache.hadoop.mapreduce.InputFormat;
070import org.apache.hadoop.mapreduce.InputSplit;
071import org.apache.hadoop.mapreduce.Job;
072import org.apache.hadoop.mapreduce.JobContext;
073import org.apache.hadoop.mapreduce.Mapper;
074import org.apache.hadoop.mapreduce.Partitioner;
075import org.apache.hadoop.mapreduce.RecordReader;
076import org.apache.hadoop.mapreduce.Reducer;
077import org.apache.hadoop.mapreduce.TaskAttemptContext;
078import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
079import org.apache.hadoop.util.StringUtils;
080import org.apache.hadoop.util.ToolRunner;
081import org.junit.Test;
082import org.junit.experimental.categories.Category;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
087import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
088import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
089
090/**
091 * Test Bulk Load and MR on a distributed cluster. It starts an MR job that creates linked chains
092 * The format of rows is like this: Row Key -> Long L:<< Chain Id >> -> Row Key of the next link in
093 * the chain S:<< Chain Id >> -> The step in the chain that his link is. D:<< Chain Id >> -> Random
094 * Data. All chains start on row 0. All rk's are > 0. After creating the linked lists they are
095 * walked over using a TableMapper based Mapreduce Job. There are a few options exposed:
096 * hbase.IntegrationTestBulkLoad.chainLength The number of rows that will be part of each and every
097 * chain. hbase.IntegrationTestBulkLoad.numMaps The number of mappers that will be run. Each mapper
098 * creates on linked list chain. hbase.IntegrationTestBulkLoad.numImportRounds How many jobs will be
099 * run to create linked lists. hbase.IntegrationTestBulkLoad.tableName The name of the table.
100 * hbase.IntegrationTestBulkLoad.replicaCount How many region replicas to configure for the table
101 * under test.
102 */
103@Category(IntegrationTests.class)
104public class IntegrationTestBulkLoad extends IntegrationTestBase {
105
106  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBulkLoad.class);
107
108  private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
109  private static final byte[] SORT_FAM = Bytes.toBytes("S");
110  private static final byte[] DATA_FAM = Bytes.toBytes("D");
111
112  private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
113  private static int CHAIN_LENGTH = 500000;
114
115  private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
116  private static int NUM_MAPS = 1;
117
118  private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
119  private static int NUM_IMPORT_ROUNDS = 1;
120
121  private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
122
123  private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
124  private static String TABLE_NAME = "IntegrationTestBulkLoad";
125
126  private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
127  private static int NUM_REPLICA_COUNT_DEFAULT = 1;
128
129  private static final String OPT_LOAD = "load";
130  private static final String OPT_CHECK = "check";
131
132  private boolean load = false;
133  private boolean check = false;
134
135  public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver {
136    static final AtomicLong sleepTime = new AtomicLong(2000);
137    AtomicLong countOfNext = new AtomicLong(0);
138    AtomicLong countOfOpen = new AtomicLong(0);
139
140    public SlowMeCoproScanOperations() {
141    }
142
143    @Override
144    public Optional<RegionObserver> getRegionObserver() {
145      return Optional.of(this);
146    }
147
148    @Override
149    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
150      final Scan scan) throws IOException {
151      if (countOfOpen.incrementAndGet() == 2) { // slowdown openScanner randomly
152        slowdownCode(e);
153      }
154    }
155
156    @Override
157    public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
158      final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore)
159      throws IOException {
160      // this will slow down a certain next operation if the conditions are met. The slowness
161      // will allow the call to go to a replica
162      countOfNext.incrementAndGet();
163      if (countOfNext.get() == 0 || countOfNext.get() == 4) {
164        slowdownCode(e);
165      }
166      return true;
167    }
168
169    protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
170      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
171        try {
172          if (sleepTime.get() > 0) {
173            LOG.info("Sleeping for " + sleepTime.get() + " ms");
174            Thread.sleep(sleepTime.get());
175          }
176        } catch (InterruptedException e1) {
177          LOG.error(e1.toString(), e1);
178        }
179      }
180    }
181  }
182
183  /**
184   * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}.
185   */
186  private void installSlowingCoproc() throws IOException, InterruptedException {
187    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
188    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
189
190    TableName t = getTablename();
191    Admin admin = util.getAdmin();
192    TableDescriptor desc = admin.getDescriptor(t);
193    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc);
194    builder.setCoprocessor(SlowMeCoproScanOperations.class.getName());
195    HBaseTestingUtility.modifyTableSync(admin, builder.build());
196  }
197
198  @Test
199  public void testBulkLoad() throws Exception {
200    runLoad();
201    installSlowingCoproc();
202    runCheckWithRetry();
203  }
204
205  public void runLoad() throws Exception {
206    setupTable();
207    int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
208    LOG.info("Running load with numIterations:" + numImportRounds);
209    for (int i = 0; i < numImportRounds; i++) {
210      runLinkedListMRJob(i);
211    }
212  }
213
214  private byte[][] getSplits(int numRegions) {
215    RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
216    split.setFirstRow(Bytes.toBytes(0L));
217    split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
218    return split.split(numRegions);
219  }
220
221  private void setupTable() throws IOException, InterruptedException {
222    if (util.getAdmin().tableExists(getTablename())) {
223      util.deleteTable(getTablename());
224    }
225
226    util.createTable(getTablename(), new byte[][] { CHAIN_FAM, SORT_FAM, DATA_FAM }, getSplits(16));
227
228    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
229    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
230
231    TableName t = getTablename();
232    HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount);
233  }
234
235  private void runLinkedListMRJob(int iteration) throws Exception {
236    String jobName =
237      IntegrationTestBulkLoad.class.getSimpleName() + " - " + EnvironmentEdgeManager.currentTime();
238    Configuration conf = new Configuration(util.getConfiguration());
239    Path p = null;
240    if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
241      p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
242    } else {
243      p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY));
244    }
245
246    conf.setBoolean("mapreduce.map.speculative", false);
247    conf.setBoolean("mapreduce.reduce.speculative", false);
248    conf.setInt(ROUND_NUM_KEY, iteration);
249
250    Job job = new Job(conf);
251
252    job.setJobName(jobName);
253
254    // set the input format so that we can create map tasks with no data input.
255    job.setInputFormatClass(ITBulkLoadInputFormat.class);
256
257    // Set the mapper classes.
258    job.setMapperClass(LinkedListCreationMapper.class);
259    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
260    job.setMapOutputValueClass(KeyValue.class);
261
262    // Use the identity reducer
263    // So nothing to do here.
264
265    // Set this jar.
266    job.setJarByClass(getClass());
267
268    // Set where to place the hfiles.
269    FileOutputFormat.setOutputPath(job, p);
270    try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin();
271      Table table = conn.getTable(getTablename());
272      RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
273
274      // Configure the partitioner and other things needed for HFileOutputFormat.
275      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
276
277      // Run the job making sure it works.
278      assertEquals(true, job.waitForCompletion(true));
279
280      // Create a new loader.
281      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
282
283      // Load the HFiles in.
284      loader.doBulkLoad(p, admin, table, regionLocator);
285    }
286
287    // Delete the files.
288    util.getTestFileSystem().delete(p, true);
289  }
290
291  public static class EmptySplit extends InputSplit implements Writable {
292    @Override
293    public void write(DataOutput out) throws IOException {
294    }
295
296    @Override
297    public void readFields(DataInput in) throws IOException {
298    }
299
300    @Override
301    public long getLength() {
302      return 0L;
303    }
304
305    @Override
306    public String[] getLocations() {
307      return new String[0];
308    }
309  }
310
311  public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
312    private int index = -1;
313    private K[] keys;
314    private V[] values;
315
316    public FixedRecordReader(K[] keys, V[] values) {
317      this.keys = keys;
318      this.values = values;
319    }
320
321    @Override
322    public void initialize(InputSplit split, TaskAttemptContext context)
323      throws IOException, InterruptedException {
324    }
325
326    @Override
327    public boolean nextKeyValue() throws IOException, InterruptedException {
328      return ++index < keys.length;
329    }
330
331    @Override
332    public K getCurrentKey() throws IOException, InterruptedException {
333      return keys[index];
334    }
335
336    @Override
337    public V getCurrentValue() throws IOException, InterruptedException {
338      return values[index];
339    }
340
341    @Override
342    public float getProgress() throws IOException, InterruptedException {
343      return (float) index / keys.length;
344    }
345
346    @Override
347    public void close() throws IOException {
348    }
349  }
350
351  public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
352    @Override
353    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
354      int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
355      ArrayList<InputSplit> ret = new ArrayList<>(numSplits);
356      for (int i = 0; i < numSplits; ++i) {
357        ret.add(new EmptySplit());
358      }
359      return ret;
360    }
361
362    @Override
363    public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
364      TaskAttemptContext context) throws IOException, InterruptedException {
365      int taskId = context.getTaskAttemptID().getTaskID().getId();
366      int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
367      int numIterations =
368        context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
369      int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
370
371      taskId = taskId + iteration * numMapTasks;
372      numMapTasks = numMapTasks * numIterations;
373
374      long chainId = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
375      chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per
376                                                            // task and across iterations
377      LongWritable[] keys = new LongWritable[] { new LongWritable(chainId) };
378
379      return new FixedRecordReader<>(keys, keys);
380    }
381  }
382
383  /**
384   * Mapper that creates a linked list of KeyValues. Each map task generates one linked list. All
385   * lists start on row key 0L. All lists should be CHAIN_LENGTH long.
386   */
387  public static class LinkedListCreationMapper
388    extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
389
390    @Override
391    protected void map(LongWritable key, LongWritable value, Context context)
392      throws IOException, InterruptedException {
393      long chainId = value.get();
394      LOG.info("Starting mapper with chainId:" + chainId);
395
396      byte[] chainIdArray = Bytes.toBytes(chainId);
397      long currentRow = 0;
398
399      long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
400      long nextRow = getNextRow(0, chainLength);
401      byte[] valueBytes = new byte[50];
402
403      for (long i = 0; i < chainLength; i++) {
404        byte[] rk = Bytes.toBytes(currentRow);
405
406        // Next link in the chain.
407        KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
408        // What link in the chain this is.
409        KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
410        // Added data so that large stores are created.
411        Bytes.random(valueBytes);
412        KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray, valueBytes);
413
414        // Emit the key values.
415        context.write(new ImmutableBytesWritable(rk), linkKv);
416        context.write(new ImmutableBytesWritable(rk), sortKv);
417        context.write(new ImmutableBytesWritable(rk), dataKv);
418        // Move to the next row.
419        currentRow = nextRow;
420        nextRow = getNextRow(i + 1, chainLength);
421      }
422    }
423
424    /** Returns a unique row id within this chain for this index */
425    private long getNextRow(long index, long chainLength) {
426      long nextRow = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
427      // use significant bits from the random number, but pad with index to ensure it is unique
428      // this also ensures that we do not reuse row = 0
429      // row collisions from multiple mappers are fine, since we guarantee unique chainIds
430      nextRow = nextRow - (nextRow % chainLength) + index;
431      return nextRow;
432    }
433  }
434
435  /**
436   * Writable class used as the key to group links in the linked list. Used as the key emited from a
437   * pass over the table.
438   */
439  public static class LinkKey implements WritableComparable<LinkKey> {
440
441    private Long chainId;
442
443    public Long getOrder() {
444      return order;
445    }
446
447    public Long getChainId() {
448      return chainId;
449    }
450
451    private Long order;
452
453    public LinkKey() {
454    }
455
456    public LinkKey(long chainId, long order) {
457      this.chainId = chainId;
458      this.order = order;
459    }
460
461    @Override
462    public int compareTo(LinkKey linkKey) {
463      int res = getChainId().compareTo(linkKey.getChainId());
464      if (res == 0) {
465        res = getOrder().compareTo(linkKey.getOrder());
466      }
467      return res;
468    }
469
470    @Override
471    public void write(DataOutput dataOutput) throws IOException {
472      WritableUtils.writeVLong(dataOutput, chainId);
473      WritableUtils.writeVLong(dataOutput, order);
474    }
475
476    @Override
477    public void readFields(DataInput dataInput) throws IOException {
478      chainId = WritableUtils.readVLong(dataInput);
479      order = WritableUtils.readVLong(dataInput);
480    }
481  }
482
483  /**
484   * Writable used as the value emitted from a pass over the hbase table.
485   */
486  public static class LinkChain implements WritableComparable<LinkChain> {
487
488    public Long getNext() {
489      return next;
490    }
491
492    public Long getRk() {
493      return rk;
494    }
495
496    public LinkChain() {
497    }
498
499    public LinkChain(Long rk, Long next) {
500      this.rk = rk;
501      this.next = next;
502    }
503
504    private Long rk;
505    private Long next;
506
507    @Override
508    public int compareTo(LinkChain linkChain) {
509      int res = getRk().compareTo(linkChain.getRk());
510      if (res == 0) {
511        res = getNext().compareTo(linkChain.getNext());
512      }
513      return res;
514    }
515
516    @Override
517    public void write(DataOutput dataOutput) throws IOException {
518      WritableUtils.writeVLong(dataOutput, rk);
519      WritableUtils.writeVLong(dataOutput, next);
520    }
521
522    @Override
523    public void readFields(DataInput dataInput) throws IOException {
524      rk = WritableUtils.readVLong(dataInput);
525      next = WritableUtils.readVLong(dataInput);
526    }
527  }
528
529  /**
530   * Class to figure out what partition to send a link in the chain to. This is based upon the
531   * linkKey's ChainId.
532   */
533  public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
534    @Override
535    public int getPartition(LinkKey linkKey, LinkChain linkChain, int numPartitions) {
536      int hash = linkKey.getChainId().hashCode();
537      return Math.abs(hash % numPartitions);
538    }
539  }
540
541  /**
542   * Comparator used to figure out if a linkKey should be grouped together. This is based upon the
543   * linkKey's ChainId.
544   */
545  public static class NaturalKeyGroupingComparator extends WritableComparator {
546
547    protected NaturalKeyGroupingComparator() {
548      super(LinkKey.class, true);
549    }
550
551    @Override
552    public int compare(WritableComparable w1, WritableComparable w2) {
553      LinkKey k1 = (LinkKey) w1;
554      LinkKey k2 = (LinkKey) w2;
555
556      return k1.getChainId().compareTo(k2.getChainId());
557    }
558  }
559
560  /**
561   * Comparator used to order linkKeys so that they are passed to a reducer in order. This is based
562   * upon linkKey ChainId and Order.
563   */
564  public static class CompositeKeyComparator extends WritableComparator {
565
566    protected CompositeKeyComparator() {
567      super(LinkKey.class, true);
568    }
569
570    @Override
571    public int compare(WritableComparable w1, WritableComparable w2) {
572      LinkKey k1 = (LinkKey) w1;
573      LinkKey k2 = (LinkKey) w2;
574
575      return k1.compareTo(k2);
576    }
577  }
578
579  /**
580   * Mapper to pass over the table. For every row there could be multiple chains that landed on this
581   * row. So emit a linkKey and value for each.
582   */
583  public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
584    @Override
585    protected void map(ImmutableBytesWritable key, Result value, Context context)
586      throws IOException, InterruptedException {
587      long longRk = Bytes.toLong(value.getRow());
588
589      for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
590        long chainId = Bytes.toLong(entry.getKey());
591        long next = Bytes.toLong(entry.getValue());
592        Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
593        long order = Bytes.toLong(CellUtil.cloneValue(c));
594        context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
595      }
596    }
597  }
598
599  /**
600   * Class that does the actual checking of the links. All links in the chain should be grouped and
601   * sorted when sent to this class. Then the chain will be traversed making sure that no link is
602   * missing and that the chain is the correct length. This will throw an exception if anything is
603   * not correct. That causes the job to fail if any data is corrupt.
604   */
605  public static class LinkedListCheckingReducer
606    extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
607    @Override
608    protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
609      throws java.io.IOException, java.lang.InterruptedException {
610      long next = -1L;
611      long prev = -1L;
612      long count = 0L;
613
614      for (LinkChain lc : values) {
615
616        if (next == -1) {
617          if (lc.getRk() != 0L) {
618            String msg = "Chains should all start at rk 0, but read rk " + lc.getRk() + ". Chain:"
619              + key.chainId + ", order:" + key.order;
620            logError(msg, context);
621            throw new RuntimeException(msg);
622          }
623          next = lc.getNext();
624        } else {
625          if (next != lc.getRk()) {
626            String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting " + next
627              + " but got " + lc.getRk() + ". Chain:" + key.chainId + ", order:" + key.order;
628            logError(msg, context);
629            throw new RuntimeException(msg);
630          }
631          prev = lc.getRk();
632          next = lc.getNext();
633        }
634        count++;
635      }
636
637      int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
638      if (count != expectedChainLen) {
639        String msg = "Chain wasn't the correct length.  Expected " + expectedChainLen + " got "
640          + count + ". Chain:" + key.chainId + ", order:" + key.order;
641        logError(msg, context);
642        throw new RuntimeException(msg);
643      }
644    }
645
646    private static void logError(String msg, Context context) throws IOException {
647      TableName table = getTableName(context.getConfiguration());
648
649      LOG.error("Failure in chain verification: " + msg);
650      try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
651        Admin admin = connection.getAdmin()) {
652        LOG.error("cluster status:\n" + admin.getClusterStatus());
653        LOG.error("table regions:\n" + Joiner.on("\n").join(admin.getTableRegions(table)));
654      }
655    }
656  }
657
658  private void runCheckWithRetry()
659    throws IOException, ClassNotFoundException, InterruptedException {
660    try {
661      runCheck();
662    } catch (Throwable t) {
663      LOG.warn("Received " + StringUtils.stringifyException(t));
664      LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not");
665      runCheck();
666      throw t; // we should still fail the test even if second retry succeeds
667    }
668    // everything green
669  }
670
671  /**
672   * After adding data to the table start a mr job to nnn
673   */
674  private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
675    LOG.info("Running check");
676    Configuration conf = getConf();
677    String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
678    Path p = util.getDataTestDirOnTestFS(jobName);
679
680    Job job = new Job(conf);
681    job.setJarByClass(getClass());
682    job.setJobName(jobName);
683
684    job.setPartitionerClass(NaturalKeyPartitioner.class);
685    job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
686    job.setSortComparatorClass(CompositeKeyComparator.class);
687
688    Scan scan = new Scan();
689    scan.addFamily(CHAIN_FAM);
690    scan.addFamily(SORT_FAM);
691    scan.setMaxVersions(1);
692    scan.setCacheBlocks(false);
693    scan.setBatch(1000);
694
695    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
696    if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
697      scan.setConsistency(Consistency.TIMELINE);
698    }
699
700    TableMapReduceUtil.initTableMapperJob(getTablename().getName(), scan,
701      LinkedListCheckingMapper.class, LinkKey.class, LinkChain.class, job);
702
703    job.setReducerClass(LinkedListCheckingReducer.class);
704    job.setOutputKeyClass(NullWritable.class);
705    job.setOutputValueClass(NullWritable.class);
706
707    FileOutputFormat.setOutputPath(job, p);
708
709    assertEquals(true, job.waitForCompletion(true));
710
711    // Delete the files.
712    util.getTestFileSystem().delete(p, true);
713  }
714
715  @Override
716  public void setUpCluster() throws Exception {
717    util = getTestingUtil(getConf());
718    util.initializeCluster(1);
719    int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
720    if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
721      LOG.debug("Region Replicas enabled: " + replicaCount);
722    }
723
724    // Scale this up on a real cluster
725    if (util.isDistributedCluster()) {
726      util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
727        Integer.toString(util.getAdmin().getRegionServers().size() * 10));
728      util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
729    } else {
730      util.startMiniMapReduceCluster();
731    }
732  }
733
734  @Override
735  protected void addOptions() {
736    super.addOptions();
737    super.addOptNoArg(OPT_CHECK, "Run check only");
738    super.addOptNoArg(OPT_LOAD, "Run load only");
739  }
740
741  @Override
742  protected void processOptions(CommandLine cmd) {
743    super.processOptions(cmd);
744    check = cmd.hasOption(OPT_CHECK);
745    load = cmd.hasOption(OPT_LOAD);
746  }
747
748  @Override
749  public int runTestFromCommandLine() throws Exception {
750    if (load) {
751      runLoad();
752    } else if (check) {
753      installSlowingCoproc();
754      runCheckWithRetry();
755    } else {
756      testBulkLoad();
757    }
758    return 0;
759  }
760
761  @Override
762  public TableName getTablename() {
763    return getTableName(getConf());
764  }
765
766  public static TableName getTableName(Configuration conf) {
767    return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME));
768  }
769
770  @Override
771  protected Set<String> getColumnFamilies() {
772    return Sets.newHashSet(Bytes.toString(CHAIN_FAM), Bytes.toString(DATA_FAM),
773      Bytes.toString(SORT_FAM));
774  }
775
776  public static void main(String[] args) throws Exception {
777    Configuration conf = HBaseConfiguration.create();
778    IntegrationTestingUtility.setUseDistributedCluster(conf);
779    int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
780    System.exit(status);
781  }
782}