001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.DataInput;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.concurrent.atomic.AtomicLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.IntegrationTestBase;
039import org.apache.hadoop.hbase.IntegrationTestingUtility;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.ConnectionFactory;
045import org.apache.hadoop.hbase.client.Consistency;
046import org.apache.hadoop.hbase.client.RegionLocator;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.coprocessor.ObserverContext;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
054import org.apache.hadoop.hbase.coprocessor.RegionObserver;
055import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
056import org.apache.hadoop.hbase.regionserver.InternalScanner;
057import org.apache.hadoop.hbase.testclassification.IntegrationTests;
058import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.RegionSplitter;
062import org.apache.hadoop.io.LongWritable;
063import org.apache.hadoop.io.NullWritable;
064import org.apache.hadoop.io.Writable;
065import org.apache.hadoop.io.WritableComparable;
066import org.apache.hadoop.io.WritableComparator;
067import org.apache.hadoop.io.WritableUtils;
068import org.apache.hadoop.mapreduce.InputFormat;
069import org.apache.hadoop.mapreduce.InputSplit;
070import org.apache.hadoop.mapreduce.Job;
071import org.apache.hadoop.mapreduce.JobContext;
072import org.apache.hadoop.mapreduce.Mapper;
073import org.apache.hadoop.mapreduce.Partitioner;
074import org.apache.hadoop.mapreduce.RecordReader;
075import org.apache.hadoop.mapreduce.Reducer;
076import org.apache.hadoop.mapreduce.TaskAttemptContext;
077import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
078import org.apache.hadoop.util.StringUtils;
079import org.apache.hadoop.util.ToolRunner;
080import org.junit.Test;
081import org.junit.experimental.categories.Category;
082import org.slf4j.Logger;
083import org.slf4j.LoggerFactory;
084
085import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
086import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
087import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
088
089/**
090 * Test Bulk Load and MR on a distributed cluster. It starts an MR job that creates linked chains
091 * The format of rows is like this: Row Key -> Long L:<< Chain Id >> -> Row Key of the next link in
092 * the chain S:<< Chain Id >> -> The step in the chain that his link is. D:<< Chain Id >> -> Random
093 * Data. All chains start on row 0. All rk's are > 0. After creating the linked lists they are
094 * walked over using a TableMapper based Mapreduce Job. There are a few options exposed:
095 * hbase.IntegrationTestBulkLoad.chainLength The number of rows that will be part of each and every
096 * chain. hbase.IntegrationTestBulkLoad.numMaps The number of mappers that will be run. Each mapper
097 * creates on linked list chain. hbase.IntegrationTestBulkLoad.numImportRounds How many jobs will be
098 * run to create linked lists. hbase.IntegrationTestBulkLoad.tableName The name of the table.
099 * hbase.IntegrationTestBulkLoad.replicaCount How many region replicas to configure for the table
100 * under test.
101 */
102@Category(IntegrationTests.class)
103public class IntegrationTestBulkLoad extends IntegrationTestBase {
104
105  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBulkLoad.class);
106
107  private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
108  private static final byte[] SORT_FAM = Bytes.toBytes("S");
109  private static final byte[] DATA_FAM = Bytes.toBytes("D");
110
111  private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
112  private static int CHAIN_LENGTH = 500000;
113
114  private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
115  private static int NUM_MAPS = 1;
116
117  private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
118  private static int NUM_IMPORT_ROUNDS = 1;
119
120  private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
121
122  private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
123  private static String TABLE_NAME = "IntegrationTestBulkLoad";
124
125  private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
126  private static int NUM_REPLICA_COUNT_DEFAULT = 1;
127
128  private static final String OPT_LOAD = "load";
129  private static final String OPT_CHECK = "check";
130
131  private boolean load = false;
132  private boolean check = false;
133
134  public static class SlowMeCoproScanOperations implements RegionCoprocessor, RegionObserver {
135    static final AtomicLong sleepTime = new AtomicLong(2000);
136    AtomicLong countOfNext = new AtomicLong(0);
137    AtomicLong countOfOpen = new AtomicLong(0);
138
139    public SlowMeCoproScanOperations() {
140    }
141
142    @Override
143    public Optional<RegionObserver> getRegionObserver() {
144      return Optional.of(this);
145    }
146
147    @Override
148    public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
149      final Scan scan) throws IOException {
150      if (countOfOpen.incrementAndGet() == 2) { // slowdown openScanner randomly
151        slowdownCode(e);
152      }
153    }
154
155    @Override
156    public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
157      final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore)
158      throws IOException {
159      // this will slow down a certain next operation if the conditions are met. The slowness
160      // will allow the call to go to a replica
161      countOfNext.incrementAndGet();
162      if (countOfNext.get() == 0 || countOfNext.get() == 4) {
163        slowdownCode(e);
164      }
165      return true;
166    }
167
168    protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
169      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
170        try {
171          if (sleepTime.get() > 0) {
172            LOG.info("Sleeping for " + sleepTime.get() + " ms");
173            Thread.sleep(sleepTime.get());
174          }
175        } catch (InterruptedException e1) {
176          LOG.error(e1.toString(), e1);
177        }
178      }
179    }
180  }
181
182  /**
183   * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}.
184   */
185  private void installSlowingCoproc() throws IOException, InterruptedException {
186    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
187    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
188
189    TableName t = getTablename();
190    Admin admin = util.getAdmin();
191    TableDescriptor desc = admin.getDescriptor(t);
192    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(desc);
193    builder.setCoprocessor(SlowMeCoproScanOperations.class.getName());
194    admin.modifyTable(builder.build());
195  }
196
197  @Test
198  public void testBulkLoad() throws Exception {
199    runLoad();
200    installSlowingCoproc();
201    runCheckWithRetry();
202  }
203
204  public void runLoad() throws Exception {
205    setupTable();
206    int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
207    LOG.info("Running load with numIterations:" + numImportRounds);
208    for (int i = 0; i < numImportRounds; i++) {
209      runLinkedListMRJob(i);
210    }
211  }
212
213  private byte[][] getSplits(int numRegions) {
214    RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
215    split.setFirstRow(Bytes.toBytes(0L));
216    split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
217    return split.split(numRegions);
218  }
219
220  private void setupTable() throws IOException, InterruptedException {
221    if (util.getAdmin().tableExists(getTablename())) {
222      util.deleteTable(getTablename());
223    }
224
225    util.createTable(getTablename(), new byte[][] { CHAIN_FAM, SORT_FAM, DATA_FAM }, getSplits(16));
226
227    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
228    if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
229
230    TableName t = getTablename();
231    HBaseTestingUtil.setReplicas(util.getAdmin(), t, replicaCount);
232  }
233
234  private void runLinkedListMRJob(int iteration) throws Exception {
235    String jobName =
236      IntegrationTestBulkLoad.class.getSimpleName() + " - " + EnvironmentEdgeManager.currentTime();
237    Configuration conf = new Configuration(util.getConfiguration());
238    Path p = null;
239    if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
240      p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
241    } else {
242      p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY));
243    }
244
245    conf.setBoolean("mapreduce.map.speculative", false);
246    conf.setBoolean("mapreduce.reduce.speculative", false);
247    conf.setInt(ROUND_NUM_KEY, iteration);
248
249    Job job = new Job(conf);
250
251    job.setJobName(jobName);
252
253    // set the input format so that we can create map tasks with no data input.
254    job.setInputFormatClass(ITBulkLoadInputFormat.class);
255
256    // Set the mapper classes.
257    job.setMapperClass(LinkedListCreationMapper.class);
258    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
259    job.setMapOutputValueClass(KeyValue.class);
260
261    // Use the identity reducer
262    // So nothing to do here.
263
264    // Set this jar.
265    job.setJarByClass(getClass());
266
267    // Set where to place the hfiles.
268    FileOutputFormat.setOutputPath(job, p);
269    try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin();
270      RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
271      // Configure the partitioner and other things needed for HFileOutputFormat.
272      HFileOutputFormat2.configureIncrementalLoad(job, admin.getDescriptor(getTablename()),
273        regionLocator);
274      // Run the job making sure it works.
275      assertEquals(true, job.waitForCompletion(true));
276    }
277    // Create a new loader.
278    BulkLoadHFiles loader = BulkLoadHFiles.create(conf);
279    // Load the HFiles in.
280    loader.bulkLoad(getTablename(), p);
281    // Delete the files.
282    util.getTestFileSystem().delete(p, true);
283  }
284
285  public static class EmptySplit extends InputSplit implements Writable {
286    @Override
287    public void write(DataOutput out) throws IOException {
288    }
289
290    @Override
291    public void readFields(DataInput in) throws IOException {
292    }
293
294    @Override
295    public long getLength() {
296      return 0L;
297    }
298
299    @Override
300    public String[] getLocations() {
301      return new String[0];
302    }
303  }
304
305  public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
306    private int index = -1;
307    private K[] keys;
308    private V[] values;
309
310    public FixedRecordReader(K[] keys, V[] values) {
311      this.keys = keys;
312      this.values = values;
313    }
314
315    @Override
316    public void initialize(InputSplit split, TaskAttemptContext context)
317      throws IOException, InterruptedException {
318    }
319
320    @Override
321    public boolean nextKeyValue() throws IOException, InterruptedException {
322      return ++index < keys.length;
323    }
324
325    @Override
326    public K getCurrentKey() throws IOException, InterruptedException {
327      return keys[index];
328    }
329
330    @Override
331    public V getCurrentValue() throws IOException, InterruptedException {
332      return values[index];
333    }
334
335    @Override
336    public float getProgress() throws IOException, InterruptedException {
337      return (float) index / keys.length;
338    }
339
340    @Override
341    public void close() throws IOException {
342    }
343  }
344
345  public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
346    @Override
347    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
348      int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
349      ArrayList<InputSplit> ret = new ArrayList<>(numSplits);
350      for (int i = 0; i < numSplits; ++i) {
351        ret.add(new EmptySplit());
352      }
353      return ret;
354    }
355
356    @Override
357    public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
358      TaskAttemptContext context) throws IOException, InterruptedException {
359      int taskId = context.getTaskAttemptID().getTaskID().getId();
360      int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
361      int numIterations =
362        context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
363      int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
364
365      taskId = taskId + iteration * numMapTasks;
366      numMapTasks = numMapTasks * numIterations;
367
368      long chainId = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
369      chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per
370                                                            // task and across iterations
371      LongWritable[] keys = new LongWritable[] { new LongWritable(chainId) };
372
373      return new FixedRecordReader<>(keys, keys);
374    }
375  }
376
377  /**
378   * Mapper that creates a linked list of KeyValues. Each map task generates one linked list. All
379   * lists start on row key 0L. All lists should be CHAIN_LENGTH long.
380   */
381  public static class LinkedListCreationMapper
382    extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
383
384    @Override
385    protected void map(LongWritable key, LongWritable value, Context context)
386      throws IOException, InterruptedException {
387      long chainId = value.get();
388      LOG.info("Starting mapper with chainId:" + chainId);
389
390      byte[] chainIdArray = Bytes.toBytes(chainId);
391      long currentRow = 0;
392
393      long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
394      long nextRow = getNextRow(0, chainLength);
395      byte[] valueBytes = new byte[50];
396
397      for (long i = 0; i < chainLength; i++) {
398        byte[] rk = Bytes.toBytes(currentRow);
399
400        // Next link in the chain.
401        KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
402        // What link in the chain this is.
403        KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
404        // Added data so that large stores are created.
405        Bytes.random(valueBytes);
406        KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray, valueBytes);
407
408        // Emit the key values.
409        context.write(new ImmutableBytesWritable(rk), linkKv);
410        context.write(new ImmutableBytesWritable(rk), sortKv);
411        context.write(new ImmutableBytesWritable(rk), dataKv);
412        // Move to the next row.
413        currentRow = nextRow;
414        nextRow = getNextRow(i + 1, chainLength);
415      }
416    }
417
418    /** Returns a unique row id within this chain for this index */
419    private long getNextRow(long index, long chainLength) {
420      long nextRow = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
421      // use significant bits from the random number, but pad with index to ensure it is unique
422      // this also ensures that we do not reuse row = 0
423      // row collisions from multiple mappers are fine, since we guarantee unique chainIds
424      nextRow = nextRow - (nextRow % chainLength) + index;
425      return nextRow;
426    }
427  }
428
429  /**
430   * Writable class used as the key to group links in the linked list. Used as the key emited from a
431   * pass over the table.
432   */
433  public static class LinkKey implements WritableComparable<LinkKey> {
434
435    private Long chainId;
436
437    public Long getOrder() {
438      return order;
439    }
440
441    public Long getChainId() {
442      return chainId;
443    }
444
445    private Long order;
446
447    public LinkKey() {
448    }
449
450    public LinkKey(long chainId, long order) {
451      this.chainId = chainId;
452      this.order = order;
453    }
454
455    @Override
456    public int compareTo(LinkKey linkKey) {
457      int res = getChainId().compareTo(linkKey.getChainId());
458      if (res == 0) {
459        res = getOrder().compareTo(linkKey.getOrder());
460      }
461      return res;
462    }
463
464    @Override
465    public void write(DataOutput dataOutput) throws IOException {
466      WritableUtils.writeVLong(dataOutput, chainId);
467      WritableUtils.writeVLong(dataOutput, order);
468    }
469
470    @Override
471    public void readFields(DataInput dataInput) throws IOException {
472      chainId = WritableUtils.readVLong(dataInput);
473      order = WritableUtils.readVLong(dataInput);
474    }
475  }
476
477  /**
478   * Writable used as the value emitted from a pass over the hbase table.
479   */
480  public static class LinkChain implements WritableComparable<LinkChain> {
481
482    public Long getNext() {
483      return next;
484    }
485
486    public Long getRk() {
487      return rk;
488    }
489
490    public LinkChain() {
491    }
492
493    public LinkChain(Long rk, Long next) {
494      this.rk = rk;
495      this.next = next;
496    }
497
498    private Long rk;
499    private Long next;
500
501    @Override
502    public int compareTo(LinkChain linkChain) {
503      int res = getRk().compareTo(linkChain.getRk());
504      if (res == 0) {
505        res = getNext().compareTo(linkChain.getNext());
506      }
507      return res;
508    }
509
510    @Override
511    public void write(DataOutput dataOutput) throws IOException {
512      WritableUtils.writeVLong(dataOutput, rk);
513      WritableUtils.writeVLong(dataOutput, next);
514    }
515
516    @Override
517    public void readFields(DataInput dataInput) throws IOException {
518      rk = WritableUtils.readVLong(dataInput);
519      next = WritableUtils.readVLong(dataInput);
520    }
521  }
522
523  /**
524   * Class to figure out what partition to send a link in the chain to. This is based upon the
525   * linkKey's ChainId.
526   */
527  public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
528    @Override
529    public int getPartition(LinkKey linkKey, LinkChain linkChain, int numPartitions) {
530      int hash = linkKey.getChainId().hashCode();
531      return Math.abs(hash % numPartitions);
532    }
533  }
534
535  /**
536   * Comparator used to figure out if a linkKey should be grouped together. This is based upon the
537   * linkKey's ChainId.
538   */
539  public static class NaturalKeyGroupingComparator extends WritableComparator {
540
541    protected NaturalKeyGroupingComparator() {
542      super(LinkKey.class, true);
543    }
544
545    @Override
546    public int compare(WritableComparable w1, WritableComparable w2) {
547      LinkKey k1 = (LinkKey) w1;
548      LinkKey k2 = (LinkKey) w2;
549
550      return k1.getChainId().compareTo(k2.getChainId());
551    }
552  }
553
554  /**
555   * Comparator used to order linkKeys so that they are passed to a reducer in order. This is based
556   * upon linkKey ChainId and Order.
557   */
558  public static class CompositeKeyComparator extends WritableComparator {
559
560    protected CompositeKeyComparator() {
561      super(LinkKey.class, true);
562    }
563
564    @Override
565    public int compare(WritableComparable w1, WritableComparable w2) {
566      LinkKey k1 = (LinkKey) w1;
567      LinkKey k2 = (LinkKey) w2;
568
569      return k1.compareTo(k2);
570    }
571  }
572
573  /**
574   * Mapper to pass over the table. For every row there could be multiple chains that landed on this
575   * row. So emit a linkKey and value for each.
576   */
577  public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
578    @Override
579    protected void map(ImmutableBytesWritable key, Result value, Context context)
580      throws IOException, InterruptedException {
581      long longRk = Bytes.toLong(value.getRow());
582
583      for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
584        long chainId = Bytes.toLong(entry.getKey());
585        long next = Bytes.toLong(entry.getValue());
586        Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
587        long order = Bytes.toLong(CellUtil.cloneValue(c));
588        context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
589      }
590    }
591  }
592
593  /**
594   * Class that does the actual checking of the links. All links in the chain should be grouped and
595   * sorted when sent to this class. Then the chain will be traversed making sure that no link is
596   * missing and that the chain is the correct length. This will throw an exception if anything is
597   * not correct. That causes the job to fail if any data is corrupt.
598   */
599  public static class LinkedListCheckingReducer
600    extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
601    @Override
602    protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
603      throws java.io.IOException, java.lang.InterruptedException {
604      long next = -1L;
605      long prev = -1L;
606      long count = 0L;
607
608      for (LinkChain lc : values) {
609
610        if (next == -1) {
611          if (lc.getRk() != 0L) {
612            String msg = "Chains should all start at rk 0, but read rk " + lc.getRk() + ". Chain:"
613              + key.chainId + ", order:" + key.order;
614            logError(msg, context);
615            throw new RuntimeException(msg);
616          }
617          next = lc.getNext();
618        } else {
619          if (next != lc.getRk()) {
620            String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting " + next
621              + " but got " + lc.getRk() + ". Chain:" + key.chainId + ", order:" + key.order;
622            logError(msg, context);
623            throw new RuntimeException(msg);
624          }
625          prev = lc.getRk();
626          next = lc.getNext();
627        }
628        count++;
629      }
630
631      int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
632      if (count != expectedChainLen) {
633        String msg = "Chain wasn't the correct length.  Expected " + expectedChainLen + " got "
634          + count + ". Chain:" + key.chainId + ", order:" + key.order;
635        logError(msg, context);
636        throw new RuntimeException(msg);
637      }
638    }
639
640    private static void logError(String msg, Context context) throws IOException {
641      TableName table = getTableName(context.getConfiguration());
642
643      LOG.error("Failure in chain verification: " + msg);
644      try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
645        Admin admin = connection.getAdmin()) {
646        LOG.error("cluster metrics:\n" + admin.getClusterMetrics());
647        LOG.error("table regions:\n" + Joiner.on("\n").join(admin.getRegions(table)));
648      }
649    }
650  }
651
652  private void runCheckWithRetry()
653    throws IOException, ClassNotFoundException, InterruptedException {
654    try {
655      runCheck();
656    } catch (Throwable t) {
657      LOG.warn("Received " + StringUtils.stringifyException(t));
658      LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not");
659      runCheck();
660      throw t; // we should still fail the test even if second retry succeeds
661    }
662    // everything green
663  }
664
665  /**
666   * After adding data to the table start a mr job to
667   */
668  private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
669    LOG.info("Running check");
670    Configuration conf = getConf();
671    String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
672    Path p = util.getDataTestDirOnTestFS(jobName);
673
674    Job job = new Job(conf);
675    job.setJarByClass(getClass());
676    job.setJobName(jobName);
677
678    job.setPartitionerClass(NaturalKeyPartitioner.class);
679    job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
680    job.setSortComparatorClass(CompositeKeyComparator.class);
681
682    Scan scan = new Scan();
683    scan.addFamily(CHAIN_FAM);
684    scan.addFamily(SORT_FAM);
685    scan.readVersions(1);
686    scan.setCacheBlocks(false);
687    scan.setBatch(1000);
688
689    int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
690    if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
691      scan.setConsistency(Consistency.TIMELINE);
692    }
693
694    TableMapReduceUtil.initTableMapperJob(getTablename().getName(), scan,
695      LinkedListCheckingMapper.class, LinkKey.class, LinkChain.class, job);
696
697    job.setReducerClass(LinkedListCheckingReducer.class);
698    job.setOutputKeyClass(NullWritable.class);
699    job.setOutputValueClass(NullWritable.class);
700
701    FileOutputFormat.setOutputPath(job, p);
702
703    assertEquals(true, job.waitForCompletion(true));
704
705    // Delete the files.
706    util.getTestFileSystem().delete(p, true);
707  }
708
709  @Override
710  public void setUpCluster() throws Exception {
711    util = getTestingUtil(getConf());
712    util.initializeCluster(1);
713    int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
714    if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
715      LOG.debug("Region Replicas enabled: " + replicaCount);
716    }
717
718    // Scale this up on a real cluster
719    if (util.isDistributedCluster()) {
720      util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
721        Integer.toString(util.getAdmin().getRegionServers().size() * 10));
722      util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
723    } else {
724      util.startMiniMapReduceCluster();
725    }
726  }
727
728  @Override
729  protected void addOptions() {
730    super.addOptions();
731    super.addOptNoArg(OPT_CHECK, "Run check only");
732    super.addOptNoArg(OPT_LOAD, "Run load only");
733  }
734
735  @Override
736  protected void processOptions(CommandLine cmd) {
737    super.processOptions(cmd);
738    check = cmd.hasOption(OPT_CHECK);
739    load = cmd.hasOption(OPT_LOAD);
740  }
741
742  @Override
743  public int runTestFromCommandLine() throws Exception {
744    if (load) {
745      runLoad();
746    } else if (check) {
747      installSlowingCoproc();
748      runCheckWithRetry();
749    } else {
750      testBulkLoad();
751    }
752    return 0;
753  }
754
755  @Override
756  public TableName getTablename() {
757    return getTableName(getConf());
758  }
759
760  public static TableName getTableName(Configuration conf) {
761    return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME));
762  }
763
764  @Override
765  protected Set<String> getColumnFamilies() {
766    return Sets.newHashSet(Bytes.toString(CHAIN_FAM), Bytes.toString(DATA_FAM),
767      Bytes.toString(SORT_FAM));
768  }
769
770  public static void main(String[] args) throws Exception {
771    Configuration conf = HBaseConfiguration.create();
772    IntegrationTestingUtility.setUseDistributedCluster(conf);
773    int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
774    System.exit(status);
775  }
776}