001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import static org.junit.Assert.assertEquals;
022
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.EnumSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.Random;
032import java.util.Set;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.commons.lang3.RandomStringUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.ClusterMetrics.Option;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.IntegrationTestBase;
043import org.apache.hadoop.hbase.IntegrationTestingUtility;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.Admin;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.ConnectionFactory;
049import org.apache.hadoop.hbase.client.Consistency;
050import org.apache.hadoop.hbase.client.RegionLocator;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.coprocessor.ObserverContext;
057import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
058import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
059import org.apache.hadoop.hbase.coprocessor.RegionObserver;
060import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
061import org.apache.hadoop.hbase.regionserver.InternalScanner;
062import org.apache.hadoop.hbase.testclassification.IntegrationTests;
063import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.hbase.util.RegionSplitter;
067import org.apache.hadoop.io.LongWritable;
068import org.apache.hadoop.io.NullWritable;
069import org.apache.hadoop.io.Writable;
070import org.apache.hadoop.io.WritableComparable;
071import org.apache.hadoop.io.WritableComparator;
072import org.apache.hadoop.io.WritableUtils;
073import org.apache.hadoop.mapreduce.InputFormat;
074import org.apache.hadoop.mapreduce.InputSplit;
075import org.apache.hadoop.mapreduce.Job;
076import org.apache.hadoop.mapreduce.JobContext;
077import org.apache.hadoop.mapreduce.Mapper;
078import org.apache.hadoop.mapreduce.Partitioner;
079import org.apache.hadoop.mapreduce.RecordReader;
080import org.apache.hadoop.mapreduce.Reducer;
081import org.apache.hadoop.mapreduce.TaskAttemptContext;
082import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
083import org.apache.hadoop.util.StringUtils;
084import org.apache.hadoop.util.ToolRunner;
085import org.junit.Test;
086import org.junit.experimental.categories.Category;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
090import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
091import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
092
093/**
094 * Test Bulk Load and MR on a distributed cluster.
095 * It starts an MR job that creates linked chains
096 *
097 * The format of rows is like this:
098 * Row Key -> Long
099 *
100 * L:<< Chain Id >> -> Row Key of the next link in the chain
101 * S:<< Chain Id >> -> The step in the chain that his link is.
102 * D:<< Chain Id >> -> Random Data.
103 *
104 * All chains start on row 0.
105 * All rk's are > 0.
106 *
107 * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
108 *
109 * There are a few options exposed:
110 *
111 * hbase.IntegrationTestBulkLoad.chainLength
112 * The number of rows that will be part of each and every chain.
113 *
114 * hbase.IntegrationTestBulkLoad.numMaps
115 * The number of mappers that will be run.  Each mapper creates on linked list chain.
116 *
117 * hbase.IntegrationTestBulkLoad.numImportRounds
118 * How many jobs will be run to create linked lists.
119 *
120 * hbase.IntegrationTestBulkLoad.tableName
121 * The name of the table.
122 *
123 * hbase.IntegrationTestBulkLoad.replicaCount
124 * How many region replicas to configure for the table under test.
125 */
126@Category(IntegrationTests.class)
127public class IntegrationTestBulkLoad extends IntegrationTestBase {
128
129  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBulkLoad.class);
130
131  private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
132  private static final byte[] SORT_FAM  = Bytes.toBytes("S");
133  private static final byte[] DATA_FAM  = Bytes.toBytes("D");
134
135  private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
136  private static int CHAIN_LENGTH = 500000;
137
138  private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
139  private static int NUM_MAPS = 1;
140
141  private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
142  private static int NUM_IMPORT_ROUNDS = 1;
143
144  private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
145
146  private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
147  private static String TABLE_NAME = "IntegrationTestBulkLoad";
148
149  private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
150  private static int NUM_REPLICA_COUNT_DEFAULT = 1;
151
152  private static final String OPT_LOAD = "load";
153  private static final String OPT_CHECK = "check";
154
155  private boolean load = false;
156  private boolean check = false;
157
158  public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver {
159    static final AtomicLong sleepTime = new AtomicLong(2000);
160    Random r = new Random();
161    AtomicLong countOfNext = new AtomicLong(0);
162    AtomicLong countOfOpen = new AtomicLong(0);
163    public SlowMeCoproScanOperations() {}
164
165    @Override
166    public Optional<RegionObserver> getRegionObserver() {
167      return Optional.of(this);
168    }
169
170    @Override
171    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
172        final Scan scan) throws IOException {
173      if (countOfOpen.incrementAndGet() == 2) { //slowdown openScanner randomly
174        slowdownCode(e);
175      }
176    }
177
178    @Override
179    public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
180        final InternalScanner s, final List<Result> results,
181        final int limit, final boolean hasMore) throws IOException {
182      //this will slow down a certain next operation if the conditions are met. The slowness
183      //will allow the call to go to a replica
184      countOfNext.incrementAndGet();
185      if (countOfNext.get() == 0 || countOfNext.get() == 4) {
186        slowdownCode(e);
187      }
188      return true;
189    }
190    protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
191      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
192        try {
193          if (sleepTime.get() > 0) {
194            LOG.info("Sleeping for " + sleepTime.get() + " ms");
195            Thread.sleep(sleepTime.get());
196          }
197        } catch (InterruptedException e1) {
198          LOG.error(e1.toString(), e1);
199        }
200      }
201    }
202  }
203
204  /**
205   * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}.
206   */
207  private void installSlowingCoproc() throws IOException, InterruptedException {
208    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
209    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
210
211    TableName t = getTablename();
212    Admin admin = util.getAdmin();
213    TableDescriptor desc = admin.getDescriptor(t);
214    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc);
215    builder.setCoprocessor(SlowMeCoproScanOperations.class.getName());
216    HBaseTestingUtility.modifyTableSync(admin, builder.build());
217  }
218
219  @Test
220  public void testBulkLoad() throws Exception {
221    runLoad();
222    installSlowingCoproc();
223    runCheckWithRetry();
224  }
225
226  public void runLoad() throws Exception {
227    setupTable();
228    int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
229    LOG.info("Running load with numIterations:" + numImportRounds);
230    for (int i = 0; i < numImportRounds; i++) {
231      runLinkedListMRJob(i);
232    }
233  }
234
235  private byte[][] getSplits(int numRegions) {
236    RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
237    split.setFirstRow(Bytes.toBytes(0L));
238    split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
239    return split.split(numRegions);
240  }
241
242  private void setupTable() throws IOException, InterruptedException {
243    if (util.getAdmin().tableExists(getTablename())) {
244      util.deleteTable(getTablename());
245    }
246
247    util.createTable(
248        getTablename(),
249        new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
250        getSplits(16)
251    );
252
253    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
254    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
255
256    TableName t = getTablename();
257    HBaseTestingUtility.setReplicas(util.getAdmin(), t, replicaCount);
258  }
259
260  private void runLinkedListMRJob(int iteration) throws Exception {
261    String jobName =  IntegrationTestBulkLoad.class.getSimpleName() + " - " +
262        EnvironmentEdgeManager.currentTime();
263    Configuration conf = new Configuration(util.getConfiguration());
264    Path p = null;
265    if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
266      p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
267    } else {
268      p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY));
269    }
270
271    conf.setBoolean("mapreduce.map.speculative", false);
272    conf.setBoolean("mapreduce.reduce.speculative", false);
273    conf.setInt(ROUND_NUM_KEY, iteration);
274
275    Job job = new Job(conf);
276
277    job.setJobName(jobName);
278
279    // set the input format so that we can create map tasks with no data input.
280    job.setInputFormatClass(ITBulkLoadInputFormat.class);
281
282    // Set the mapper classes.
283    job.setMapperClass(LinkedListCreationMapper.class);
284    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
285    job.setMapOutputValueClass(KeyValue.class);
286
287    // Use the identity reducer
288    // So nothing to do here.
289
290    // Set this jar.
291    job.setJarByClass(getClass());
292
293    // Set where to place the hfiles.
294    FileOutputFormat.setOutputPath(job, p);
295    try (Connection conn = ConnectionFactory.createConnection(conf);
296        Admin admin = conn.getAdmin();
297        Table table = conn.getTable(getTablename());
298        RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
299
300      // Configure the partitioner and other things needed for HFileOutputFormat.
301      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
302
303      // Run the job making sure it works.
304      assertEquals(true, job.waitForCompletion(true));
305
306      // Create a new loader.
307      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
308
309      // Load the HFiles in.
310      loader.doBulkLoad(p, admin, table, regionLocator);
311    }
312
313    // Delete the files.
314    util.getTestFileSystem().delete(p, true);
315  }
316
317  public static class EmptySplit extends InputSplit implements Writable {
318    @Override
319    public void write(DataOutput out) throws IOException { }
320    @Override
321    public void readFields(DataInput in) throws IOException { }
322    @Override
323    public long getLength() { return 0L; }
324    @Override
325    public String[] getLocations() { return new String[0]; }
326  }
327
328  public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
329    private int index = -1;
330    private K[] keys;
331    private V[] values;
332
333    public FixedRecordReader(K[] keys, V[] values) {
334      this.keys = keys;
335      this.values = values;
336    }
337    @Override
338    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
339    InterruptedException { }
340    @Override
341    public boolean nextKeyValue() throws IOException, InterruptedException {
342      return ++index < keys.length;
343    }
344    @Override
345    public K getCurrentKey() throws IOException, InterruptedException {
346      return keys[index];
347    }
348    @Override
349    public V getCurrentValue() throws IOException, InterruptedException {
350      return values[index];
351    }
352    @Override
353    public float getProgress() throws IOException, InterruptedException {
354      return (float)index / keys.length;
355    }
356    @Override
357    public void close() throws IOException {
358    }
359  }
360
361  public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
362    @Override
363    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
364      int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
365      ArrayList<InputSplit> ret = new ArrayList<>(numSplits);
366      for (int i = 0; i < numSplits; ++i) {
367        ret.add(new EmptySplit());
368      }
369      return ret;
370    }
371
372    @Override
373    public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
374      TaskAttemptContext context)
375          throws IOException, InterruptedException {
376      int taskId = context.getTaskAttemptID().getTaskID().getId();
377      int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
378      int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
379      int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
380
381      taskId = taskId + iteration * numMapTasks;
382      numMapTasks = numMapTasks * numIterations;
383
384      long chainId = Math.abs(new Random().nextLong());
385      chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
386      LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
387
388      return new FixedRecordReader<>(keys, keys);
389    }
390  }
391
392  /**
393   * Mapper that creates a linked list of KeyValues.
394   *
395   * Each map task generates one linked list.
396   * All lists start on row key 0L.
397   * All lists should be CHAIN_LENGTH long.
398   */
399  public static class LinkedListCreationMapper
400      extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
401
402    private Random rand = new Random();
403
404    @Override
405    protected void map(LongWritable key, LongWritable value, Context context)
406        throws IOException, InterruptedException {
407      long chainId = value.get();
408      LOG.info("Starting mapper with chainId:" + chainId);
409
410      byte[] chainIdArray = Bytes.toBytes(chainId);
411      long currentRow = 0;
412
413      long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
414      long nextRow = getNextRow(0, chainLength);
415
416      for (long i = 0; i < chainLength; i++) {
417        byte[] rk = Bytes.toBytes(currentRow);
418
419        // Next link in the chain.
420        KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
421        // What link in the chain this is.
422        KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
423        // Added data so that large stores are created.
424        KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
425          Bytes.toBytes(RandomStringUtils.randomAlphabetic(50))
426        );
427
428        // Emit the key values.
429        context.write(new ImmutableBytesWritable(rk), linkKv);
430        context.write(new ImmutableBytesWritable(rk), sortKv);
431        context.write(new ImmutableBytesWritable(rk), dataKv);
432        // Move to the next row.
433        currentRow = nextRow;
434        nextRow = getNextRow(i+1, chainLength);
435      }
436    }
437
438    /** Returns a unique row id within this chain for this index */
439    private long getNextRow(long index, long chainLength) {
440      long nextRow = Math.abs(rand.nextLong());
441      // use significant bits from the random number, but pad with index to ensure it is unique
442      // this also ensures that we do not reuse row = 0
443      // row collisions from multiple mappers are fine, since we guarantee unique chainIds
444      nextRow = nextRow - (nextRow % chainLength) + index;
445      return nextRow;
446    }
447  }
448
449  /**
450   * Writable class used as the key to group links in the linked list.
451   *
452   * Used as the key emited from a pass over the table.
453   */
454  public static class LinkKey implements WritableComparable<LinkKey> {
455
456    private Long chainId;
457
458    public Long getOrder() {
459      return order;
460    }
461
462    public Long getChainId() {
463      return chainId;
464    }
465
466    private Long order;
467
468    public LinkKey() {}
469
470    public LinkKey(long chainId, long order) {
471      this.chainId = chainId;
472      this.order = order;
473    }
474
475    @Override
476    public int compareTo(LinkKey linkKey) {
477      int res = getChainId().compareTo(linkKey.getChainId());
478      if (res == 0) {
479        res = getOrder().compareTo(linkKey.getOrder());
480      }
481      return res;
482    }
483
484    @Override
485    public void write(DataOutput dataOutput) throws IOException {
486      WritableUtils.writeVLong(dataOutput, chainId);
487      WritableUtils.writeVLong(dataOutput, order);
488    }
489
490    @Override
491    public void readFields(DataInput dataInput) throws IOException {
492      chainId = WritableUtils.readVLong(dataInput);
493      order = WritableUtils.readVLong(dataInput);
494    }
495  }
496
497  /**
498   * Writable used as the value emitted from a pass over the hbase table.
499   */
500  public static class LinkChain implements WritableComparable<LinkChain> {
501
502    public Long getNext() {
503      return next;
504    }
505
506    public Long getRk() {
507      return rk;
508    }
509
510    public LinkChain() {}
511
512    public LinkChain(Long rk, Long next) {
513      this.rk = rk;
514      this.next = next;
515    }
516
517    private Long rk;
518    private Long next;
519
520    @Override
521    public int compareTo(LinkChain linkChain) {
522      int res = getRk().compareTo(linkChain.getRk());
523      if (res == 0) {
524        res = getNext().compareTo(linkChain.getNext());
525      }
526      return res;
527    }
528
529    @Override
530    public void write(DataOutput dataOutput) throws IOException {
531      WritableUtils.writeVLong(dataOutput, rk);
532      WritableUtils.writeVLong(dataOutput, next);
533    }
534
535    @Override
536    public void readFields(DataInput dataInput) throws IOException {
537      rk = WritableUtils.readVLong(dataInput);
538      next = WritableUtils.readVLong(dataInput);
539    }
540  }
541
542  /**
543   * Class to figure out what partition to send a link in the chain to.  This is based upon
544   * the linkKey's ChainId.
545   */
546  public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
547    @Override
548    public int getPartition(LinkKey linkKey,
549                            LinkChain linkChain,
550                            int numPartitions) {
551      int hash = linkKey.getChainId().hashCode();
552      return Math.abs(hash % numPartitions);
553    }
554  }
555
556  /**
557   * Comparator used to figure out if a linkKey should be grouped together.  This is based upon the
558   * linkKey's ChainId.
559   */
560  public static class NaturalKeyGroupingComparator extends WritableComparator {
561
562    protected NaturalKeyGroupingComparator() {
563      super(LinkKey.class, true);
564    }
565
566    @Override
567    public int compare(WritableComparable w1, WritableComparable w2) {
568      LinkKey k1 = (LinkKey) w1;
569      LinkKey k2 = (LinkKey) w2;
570
571      return k1.getChainId().compareTo(k2.getChainId());
572    }
573  }
574
575  /**
576   * Comparator used to order linkKeys so that they are passed to a reducer in order.  This is based
577   * upon linkKey ChainId and Order.
578   */
579  public static class CompositeKeyComparator extends WritableComparator {
580
581    protected CompositeKeyComparator() {
582      super(LinkKey.class, true);
583    }
584
585    @Override
586    public int compare(WritableComparable w1, WritableComparable w2) {
587      LinkKey k1 = (LinkKey) w1;
588      LinkKey k2 = (LinkKey) w2;
589
590      return k1.compareTo(k2);
591    }
592  }
593
594  /**
595   * Mapper to pass over the table.
596   *
597   * For every row there could be multiple chains that landed on this row. So emit a linkKey
598   * and value for each.
599   */
600  public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
601    @Override
602    protected void map(ImmutableBytesWritable key, Result value, Context context)
603        throws IOException, InterruptedException {
604      long longRk = Bytes.toLong(value.getRow());
605
606      for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
607        long chainId = Bytes.toLong(entry.getKey());
608        long next = Bytes.toLong(entry.getValue());
609        Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
610        long order = Bytes.toLong(CellUtil.cloneValue(c));
611        context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
612      }
613    }
614  }
615
616  /**
617   * Class that does the actual checking of the links.
618   *
619   * All links in the chain should be grouped and sorted when sent to this class.  Then the chain
620   * will be traversed making sure that no link is missing and that the chain is the correct length.
621   *
622   * This will throw an exception if anything is not correct.  That causes the job to fail if any
623   * data is corrupt.
624   */
625  public static class LinkedListCheckingReducer
626      extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
627    @Override
628    protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
629        throws java.io.IOException, java.lang.InterruptedException {
630      long next = -1L;
631      long prev = -1L;
632      long count = 0L;
633
634      for (LinkChain lc : values) {
635
636        if (next == -1) {
637          if (lc.getRk() != 0L) {
638            String msg = "Chains should all start at rk 0, but read rk " + lc.getRk()
639                + ". Chain:" + key.chainId + ", order:" + key.order;
640            logError(msg, context);
641            throw new RuntimeException(msg);
642          }
643          next = lc.getNext();
644        } else {
645          if (next != lc.getRk()) {
646            String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting "
647                + next + " but got " + lc.getRk() + ". Chain:" + key.chainId
648                + ", order:" + key.order;
649            logError(msg, context);
650            throw new RuntimeException(msg);
651          }
652          prev = lc.getRk();
653          next = lc.getNext();
654        }
655        count++;
656      }
657
658      int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
659      if (count != expectedChainLen) {
660        String msg = "Chain wasn't the correct length.  Expected " + expectedChainLen + " got "
661            + count + ". Chain:" + key.chainId + ", order:" + key.order;
662        logError(msg, context);
663        throw new RuntimeException(msg);
664      }
665    }
666
667    private static void logError(String msg, Context context) throws IOException {
668      TableName table = getTableName(context.getConfiguration());
669
670      LOG.error("Failure in chain verification: " + msg);
671      try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
672          Admin admin = connection.getAdmin()) {
673        LOG.error("cluster status:\n" + admin.getClusterStatus());
674        LOG.error("table regions:\n"
675            + Joiner.on("\n").join(admin.getTableRegions(table)));
676      }
677    }
678  }
679
680  private void runCheckWithRetry() throws IOException, ClassNotFoundException, InterruptedException {
681    try {
682      runCheck();
683    } catch (Throwable t) {
684      LOG.warn("Received " + StringUtils.stringifyException(t));
685      LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not");
686      runCheck();
687      throw t; // we should still fail the test even if second retry succeeds
688    }
689    // everything green
690  }
691
692
693  /**
694   * After adding data to the table start a mr job to
695   * @throws IOException
696   * @throws ClassNotFoundException
697   * @throws InterruptedException
698   */
699  private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
700    LOG.info("Running check");
701    Configuration conf = getConf();
702    String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
703    Path p = util.getDataTestDirOnTestFS(jobName);
704
705    Job job = new Job(conf);
706    job.setJarByClass(getClass());
707    job.setJobName(jobName);
708
709    job.setPartitionerClass(NaturalKeyPartitioner.class);
710    job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
711    job.setSortComparatorClass(CompositeKeyComparator.class);
712
713    Scan scan = new Scan();
714    scan.addFamily(CHAIN_FAM);
715    scan.addFamily(SORT_FAM);
716    scan.setMaxVersions(1);
717    scan.setCacheBlocks(false);
718    scan.setBatch(1000);
719
720    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
721    if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
722      scan.setConsistency(Consistency.TIMELINE);
723    }
724
725    TableMapReduceUtil.initTableMapperJob(
726        getTablename().getName(),
727        scan,
728        LinkedListCheckingMapper.class,
729        LinkKey.class,
730        LinkChain.class,
731        job
732    );
733
734    job.setReducerClass(LinkedListCheckingReducer.class);
735    job.setOutputKeyClass(NullWritable.class);
736    job.setOutputValueClass(NullWritable.class);
737
738    FileOutputFormat.setOutputPath(job, p);
739
740    assertEquals(true, job.waitForCompletion(true));
741
742    // Delete the files.
743    util.getTestFileSystem().delete(p, true);
744  }
745
746  @Override
747  public void setUpCluster() throws Exception {
748    util = getTestingUtil(getConf());
749    util.initializeCluster(1);
750    int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
751    if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
752      LOG.debug("Region Replicas enabled: " + replicaCount);
753    }
754
755    // Scale this up on a real cluster
756    if (util.isDistributedCluster()) {
757      util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
758          Integer.toString(util.getAdmin()
759                               .getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
760                               .getLiveServerMetrics().size() * 10)
761      );
762      util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
763    } else {
764      util.startMiniMapReduceCluster();
765    }
766  }
767
768  @Override
769  protected void addOptions() {
770    super.addOptions();
771    super.addOptNoArg(OPT_CHECK, "Run check only");
772    super.addOptNoArg(OPT_LOAD, "Run load only");
773  }
774
775  @Override
776  protected void processOptions(CommandLine cmd) {
777    super.processOptions(cmd);
778    check = cmd.hasOption(OPT_CHECK);
779    load = cmd.hasOption(OPT_LOAD);
780  }
781
782  @Override
783  public int runTestFromCommandLine() throws Exception {
784    if (load) {
785      runLoad();
786    } else if (check) {
787      installSlowingCoproc();
788      runCheckWithRetry();
789    } else {
790      testBulkLoad();
791    }
792    return 0;
793  }
794
795  @Override
796  public TableName getTablename() {
797    return getTableName(getConf());
798  }
799
800  public static TableName getTableName(Configuration conf) {
801    return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME));
802  }
803
804  @Override
805  protected Set<String> getColumnFamilies() {
806    return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM),
807        Bytes.toString(SORT_FAM));
808  }
809
810  public static void main(String[] args) throws Exception {
811    Configuration conf = HBaseConfiguration.create();
812    IntegrationTestingUtility.setUseDistributedCluster(conf);
813    int status =  ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
814    System.exit(status);
815  }
816}