001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.security.SecureRandom;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.EnumSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Random;
032import java.util.Set;
033import java.util.SortedSet;
034import java.util.TreeSet;
035import java.util.UUID;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.atomic.AtomicInteger;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.conf.Configured;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.LocatedFileStatus;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.RemoteIterator;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.ClusterMetrics.Option;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HBaseTestingUtility;
048import org.apache.hadoop.hbase.HColumnDescriptor;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.HRegionLocation;
051import org.apache.hadoop.hbase.HTableDescriptor;
052import org.apache.hadoop.hbase.IntegrationTestBase;
053import org.apache.hadoop.hbase.IntegrationTestingUtility;
054import org.apache.hadoop.hbase.MasterNotRunningException;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.client.Admin;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.BufferedMutatorParams;
059import org.apache.hadoop.hbase.client.Connection;
060import org.apache.hadoop.hbase.client.ConnectionConfiguration;
061import org.apache.hadoop.hbase.client.ConnectionFactory;
062import org.apache.hadoop.hbase.client.Get;
063import org.apache.hadoop.hbase.client.Mutation;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.RegionLocator;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.client.ResultScanner;
068import org.apache.hadoop.hbase.client.Scan;
069import org.apache.hadoop.hbase.client.Table;
070import org.apache.hadoop.hbase.fs.HFileSystem;
071import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
072import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
073import org.apache.hadoop.hbase.mapreduce.TableMapper;
074import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
075import org.apache.hadoop.hbase.mapreduce.WALPlayer;
076import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy;
077import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
078import org.apache.hadoop.hbase.testclassification.IntegrationTests;
079import org.apache.hadoop.hbase.util.AbstractHBaseTool;
080import org.apache.hadoop.hbase.util.Bytes;
081import org.apache.hadoop.hbase.util.CommonFSUtils;
082import org.apache.hadoop.hbase.util.Random64;
083import org.apache.hadoop.hbase.util.RegionSplitter;
084import org.apache.hadoop.hbase.wal.WALEdit;
085import org.apache.hadoop.hbase.wal.WALKey;
086import org.apache.hadoop.io.BytesWritable;
087import org.apache.hadoop.io.NullWritable;
088import org.apache.hadoop.io.Writable;
089import org.apache.hadoop.mapreduce.Counter;
090import org.apache.hadoop.mapreduce.CounterGroup;
091import org.apache.hadoop.mapreduce.Counters;
092import org.apache.hadoop.mapreduce.InputFormat;
093import org.apache.hadoop.mapreduce.InputSplit;
094import org.apache.hadoop.mapreduce.Job;
095import org.apache.hadoop.mapreduce.JobContext;
096import org.apache.hadoop.mapreduce.Mapper;
097import org.apache.hadoop.mapreduce.RecordReader;
098import org.apache.hadoop.mapreduce.Reducer;
099import org.apache.hadoop.mapreduce.TaskAttemptContext;
100import org.apache.hadoop.mapreduce.TaskAttemptID;
101import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
102import org.apache.hadoop.mapreduce.lib.input.FileSplit;
103import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
104import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
105import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
106import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
107import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
108import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
109import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
110import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
111import org.apache.hadoop.util.Tool;
112import org.apache.hadoop.util.ToolRunner;
113import org.junit.Test;
114import org.junit.experimental.categories.Category;
115import org.slf4j.Logger;
116import org.slf4j.LoggerFactory;
117
118import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
119import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
120import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
121import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
122import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
123import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
124import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
125
126/**
127 * This is an integration test borrowed from goraci, written by Keith Turner,
128 * which is in turn inspired by the Accumulo test called continous ingest (ci).
129 * The original source code can be found here:
130 * https://github.com/keith-turner/goraci
131 * https://github.com/enis/goraci/
132 *
133 * Apache Accumulo [0] has a simple test suite that verifies that data is not
134 * lost at scale. This test suite is called continuous ingest. This test runs
135 * many ingest clients that continually create linked lists containing 25
136 * million nodes. At some point the clients are stopped and a map reduce job is
137 * run to ensure no linked list has a hole. A hole indicates data was lost.··
138 *
139 * The nodes in the linked list are random. This causes each linked list to
140 * spread across the table. Therefore if one part of a table loses data, then it
141 * will be detected by references in another part of the table.
142 *
143 * THE ANATOMY OF THE TEST
144 *
145 * Below is rough sketch of how data is written. For specific details look at
146 * the Generator code.
147 *
148 * 1 Write out 1 million nodes· 2 Flush the client· 3 Write out 1 million that
149 * reference previous million· 4 If this is the 25th set of 1 million nodes,
150 * then update 1st set of million to point to last· 5 goto 1
151 *
152 * The key is that nodes only reference flushed nodes. Therefore a node should
153 * never reference a missing node, even if the ingest client is killed at any
154 * point in time.
155 *
156 * When running this test suite w/ Accumulo there is a script running in
157 * parallel called the Aggitator that randomly and continuously kills server
158 * processes.·· The outcome was that many data loss bugs were found in Accumulo
159 * by doing this.· This test suite can also help find bugs that impact uptime
160 * and stability when· run for days or weeks.··
161 *
162 * This test suite consists the following· - a few Java programs· - a little
163 * helper script to run the java programs - a maven script to build it.··
164 *
165 * When generating data, its best to have each map task generate a multiple of
166 * 25 million. The reason for this is that circular linked list are generated
167 * every 25M. Not generating a multiple in 25M will result in some nodes in the
168 * linked list not having references. The loss of an unreferenced node can not
169 * be detected.
170 *
171 *
172 * Below is a description of the Java programs
173 *
174 * Generator - A map only job that generates data. As stated previously,·its best to generate data
175 * in multiples of 25M. An option is also available to allow concurrent walkers to select and walk
176 * random flushed loops during this phase.
177 *
178 * Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and
179 * UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same
180 * time as the Generator.
181 *
182 * Walker - A standalone program that start following a linked list· and emits timing info.··
183 *
184 * Print - A standalone program that prints nodes in the linked list
185 *
186 * Delete - A standalone program that deletes a single node
187 *
188 * This class can be run as a unit test, as an integration test, or from the command line
189 *
190 * ex:
191 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
192 *    loop 2 1 100000 /temp 1 1000 50 1 0
193 *
194 */
195@Category(IntegrationTests.class)
196public class IntegrationTestBigLinkedList extends IntegrationTestBase {
197  protected static final byte[] NO_KEY = new byte[1];
198
199  protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
200
201  protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
202
203  protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
204  private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
205  private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
206
207  //link to the id of the prev node in the linked list
208  protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
209
210  //identifier of the mapred task that generated this row
211  protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
212
213  //the id of the row within the same client.
214  protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
215
216  /** How many rows to write per map task. This has to be a multiple of 25M */
217  private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
218    = "IntegrationTestBigLinkedList.generator.num_rows";
219
220  private static final String GENERATOR_NUM_MAPPERS_KEY
221    = "IntegrationTestBigLinkedList.generator.map.tasks";
222
223  private static final String GENERATOR_WIDTH_KEY
224    = "IntegrationTestBigLinkedList.generator.width";
225
226  private static final String GENERATOR_WRAP_KEY
227    = "IntegrationTestBigLinkedList.generator.wrap";
228
229  private static final String CONCURRENT_WALKER_KEY
230    = "IntegrationTestBigLinkedList.generator.concurrentwalkers";
231
232  protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
233
234  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
235
236  private static final int WIDTH_DEFAULT = 1000000;
237  private static final int WRAP_DEFAULT = 25;
238  private static final int ROWKEY_LENGTH = 16;
239
240  private static final int CONCURRENT_WALKER_DEFAULT = 0;
241
242  protected String toRun;
243  protected String[] otherArgs;
244
245  static class CINode {
246    byte[] key;
247    byte[] prev;
248    String client;
249    long count;
250  }
251
252  /**
253   * A Map only job that generates random linked list and stores them.
254   */
255  static class Generator extends Configured implements Tool {
256
257    private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
258
259    /**
260     * Set this configuration if you want to test single-column family flush works. If set, we will
261     * add a big column family and a small column family on either side of the usual ITBLL 'meta'
262     * column family. When we write out the ITBLL, we will also add to the big column family a value
263     * bigger than that for ITBLL and for small, something way smaller. The idea is that when
264     * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
265     * way. Here is how you would pass it:
266     * <p>
267     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
268     * -Dgenerator.multiple.columnfamilies=true generator 1 10 g
269     */
270    public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
271        "generator.multiple.columnfamilies";
272
273    /**
274     * Set this configuration if you want to scale up the size of test data quickly.
275     * <p>
276     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
277     * -Dgenerator.big.family.value.size=1024 generator 1 10 output
278     */
279    public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size";
280
281
282    public static enum Counts {
283      SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION
284    }
285
286    public static final String USAGE =  "Usage : " + Generator.class.getSimpleName() +
287        " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" +
288        " <num walker threads>] \n" +
289        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
290        "walkers will verify random flushed loop during Generation.";
291
292    public Job job;
293
294    static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
295      static class GeneratorInputSplit extends InputSplit implements Writable {
296        @Override
297        public long getLength() throws IOException, InterruptedException {
298          return 1;
299        }
300        @Override
301        public String[] getLocations() throws IOException, InterruptedException {
302          return new String[0];
303        }
304        @Override
305        public void readFields(DataInput arg0) throws IOException {
306        }
307        @Override
308        public void write(DataOutput arg0) throws IOException {
309        }
310      }
311
312      static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
313        private long count;
314        private long numNodes;
315        private Random64 rand;
316
317        @Override
318        public void close() throws IOException {
319        }
320
321        @Override
322        public BytesWritable getCurrentKey() throws IOException, InterruptedException {
323          byte[] bytes = new byte[ROWKEY_LENGTH];
324          rand.nextBytes(bytes);
325          return new BytesWritable(bytes);
326        }
327
328        @Override
329        public NullWritable getCurrentValue() throws IOException, InterruptedException {
330          return NullWritable.get();
331        }
332
333        @Override
334        public float getProgress() throws IOException, InterruptedException {
335          return (float)(count / (double)numNodes);
336        }
337
338        @Override
339        public void initialize(InputSplit arg0, TaskAttemptContext context)
340            throws IOException, InterruptedException {
341          numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
342          // Use Random64 to avoid issue described in HBASE-21256.
343          rand = new Random64();
344        }
345
346        @Override
347        public boolean nextKeyValue() throws IOException, InterruptedException {
348          return count++ < numNodes;
349        }
350
351      }
352
353      @Override
354      public RecordReader<BytesWritable,NullWritable> createRecordReader(
355          InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
356        GeneratorRecordReader rr = new GeneratorRecordReader();
357        rr.initialize(split, context);
358        return rr;
359      }
360
361      @Override
362      public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
363        int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
364
365        ArrayList<InputSplit> splits = new ArrayList<>(numMappers);
366
367        for (int i = 0; i < numMappers; i++) {
368          splits.add(new GeneratorInputSplit());
369        }
370
371        return splits;
372      }
373    }
374
375    /** Ensure output files from prev-job go to map inputs for current job */
376    static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
377      @Override
378      protected boolean isSplitable(JobContext context, Path filename) {
379        return false;
380      }
381    }
382
383    /**
384     * Some ASCII art time:
385     * <p>
386     * [ . . . ] represents one batch of random longs of length WIDTH
387     * <pre>
388     *                _________________________
389     *               |                  ______ |
390     *               |                 |      ||
391     *             .-+-----------------+-----.||
392     *             | |                 |     |||
393     * first   = [ . . . . . . . . . . . ]   |||
394     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
395     *             | | | | | | | | | | |     |||
396     * prev    = [ . . . . . . . . . . . ]   |||
397     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
398     *             | | | | | | | | | | |     |||
399     * current = [ . . . . . . . . . . . ]   |||
400     *                                       |||
401     * ...                                   |||
402     *                                       |||
403     * last    = [ . . . . . . . . . . . ]   |||
404     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
405     *             |                 |________||
406     *             |___________________________|
407     * </pre>
408     */
409
410    static class GeneratorMapper
411      extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
412
413      byte[][] first = null;
414      byte[][] prev = null;
415      byte[][] current = null;
416      byte[] id;
417      long count = 0;
418      int i;
419      BufferedMutator mutator;
420      Connection connection;
421      long numNodes;
422      long wrap;
423      int width;
424      boolean multipleUnevenColumnFamilies;
425      byte[] tinyValue = new byte[] { 't' };
426      byte[] bigValue = null;
427      Configuration conf;
428
429      volatile boolean walkersStop;
430      int numWalkers;
431      volatile List<Long> flushedLoops = new ArrayList<>();
432      List<Thread> walkers = new ArrayList<>();
433
434      @Override
435      protected void setup(Context context) throws IOException, InterruptedException {
436        id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
437        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
438        instantiateHTable();
439        this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
440        current = new byte[this.width][];
441        int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
442        this.wrap = (long)wrapMultiplier * width;
443        this.numNodes = context.getConfiguration().getLong(
444            GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
445        if (this.numNodes < this.wrap) {
446          this.wrap = this.numNodes;
447        }
448        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
449        this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
450        this.walkersStop = false;
451        this.conf = context.getConfiguration();
452
453        if (multipleUnevenColumnFamilies) {
454          int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256);
455          int limit = context.getConfiguration().getInt(
456            ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
457            ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT);
458
459          Preconditions.checkArgument(
460            n <= limit,
461            "%s(%s) > %s(%s)",
462            BIG_FAMILY_VALUE_SIZE_KEY, n, ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit);
463
464          bigValue = new byte[n];
465          ThreadLocalRandom.current().nextBytes(bigValue);
466          LOG.info("Create a bigValue with " + n + " bytes.");
467        }
468
469        Preconditions.checkArgument(
470          numNodes > 0,
471          "numNodes(%s) <= 0",
472          numNodes);
473        Preconditions.checkArgument(
474          numNodes % width == 0,
475          "numNodes(%s) mod width(%s) != 0",
476          numNodes, width);
477        Preconditions.checkArgument(
478          numNodes % wrap == 0,
479          "numNodes(%s) mod wrap(%s) != 0",
480          numNodes, wrap
481        );
482      }
483
484      protected void instantiateHTable() throws IOException {
485        mutator = connection.getBufferedMutator(
486            new BufferedMutatorParams(getTableName(connection.getConfiguration()))
487                .writeBufferSize(4 * 1024 * 1024));
488      }
489
490      @Override
491      protected void cleanup(Context context) throws IOException ,InterruptedException {
492        joinWalkers();
493        mutator.close();
494        connection.close();
495      }
496
497      @Override
498      protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
499        current[i] = new byte[key.getLength()];
500        System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
501        if (++i == current.length) {
502          LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=",
503            current.length, count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i);
504          persist(output, count, prev, current, id);
505          i = 0;
506
507          if (first == null) {
508            first = current;
509          }
510          prev = current;
511          current = new byte[this.width][];
512
513          count += current.length;
514          output.setStatus("Count " + count);
515
516          if (count % wrap == 0) {
517            // this block of code turns the 1 million linked list of length 25 into one giant
518            //circular linked list of 25 million
519            circularLeftShift(first);
520            persist(output, -1, prev, first, null);
521            // At this point the entire loop has been flushed so we can add one of its nodes to the
522            // concurrent walker
523            if (numWalkers > 0) {
524              addFlushed(key.getBytes());
525              if (walkers.isEmpty()) {
526                startWalkers(numWalkers, conf, output);
527              }
528            }
529            first = null;
530            prev = null;
531          }
532        }
533      }
534
535      private static <T> void circularLeftShift(T[] first) {
536        T ez = first[0];
537        System.arraycopy(first, 1, first, 0, first.length - 1);
538        first[first.length - 1] = ez;
539      }
540
541      private void addFlushed(byte[] rowKey) {
542        synchronized (flushedLoops) {
543          flushedLoops.add(Bytes.toLong(rowKey));
544          flushedLoops.notifyAll();
545        }
546      }
547
548      protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
549          throws IOException {
550        for (int i = 0; i < current.length; i++) {
551
552          if (i % 100 == 0) {
553            // Tickle progress every so often else maprunner will think us hung
554            output.progress();
555          }
556
557          Put put = new Put(current[i]);
558          put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
559
560          if (count >= 0) {
561            put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
562          }
563          if (id != null) {
564            put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
565          }
566          // See if we are to write multiple columns.
567          if (this.multipleUnevenColumnFamilies) {
568            // Use any column name.
569            put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
570            // Use any column name.
571            put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
572          }
573          mutator.mutate(put);
574        }
575
576        mutator.flush();
577      }
578
579      private void startWalkers(int numWalkers, Configuration conf, Context context) {
580        LOG.info("Starting " + numWalkers + " concurrent walkers");
581        for (int i = 0; i < numWalkers; i++) {
582          Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context));
583          walker.start();
584          walkers.add(walker);
585        }
586      }
587
588      private void joinWalkers() {
589        walkersStop = true;
590        synchronized (flushedLoops) {
591          flushedLoops.notifyAll();
592        }
593        for (Thread walker : walkers) {
594          try {
595            walker.join();
596          } catch (InterruptedException e) {
597            // no-op
598          }
599        }
600      }
601
602      /**
603       * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by
604       * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are
605       * configured to only log erroneous nodes.
606       */
607
608      public class ContinuousConcurrentWalker implements Runnable {
609
610        ConcurrentWalker walker;
611        Configuration conf;
612        Context context;
613        Random rand;
614
615        public ContinuousConcurrentWalker(Configuration conf, Context context) {
616          this.conf = conf;
617          this.context = context;
618          rand = new Random();
619        }
620
621        @Override
622        public void run() {
623          while (!walkersStop) {
624            try {
625              long node = selectLoop();
626              try {
627                walkLoop(node);
628              } catch (IOException e) {
629                context.getCounter(Counts.IOEXCEPTION).increment(1l);
630                return;
631              }
632            } catch (InterruptedException e) {
633              return;
634            }
635          }
636        }
637
638        private void walkLoop(long node) throws IOException {
639          walker = new ConcurrentWalker(context);
640          walker.setConf(conf);
641          walker.run(node, wrap);
642        }
643
644        private long selectLoop () throws InterruptedException{
645          synchronized (flushedLoops) {
646            while (flushedLoops.isEmpty() && !walkersStop) {
647              flushedLoops.wait();
648            }
649            if (walkersStop) {
650              throw new InterruptedException();
651            }
652            return flushedLoops.get(rand.nextInt(flushedLoops.size()));
653          }
654        }
655      }
656
657      public static class ConcurrentWalker extends WalkerBase {
658
659        Context context;
660
661        public ConcurrentWalker(Context context) {this.context = context;}
662
663        public void run(long startKeyIn, long maxQueriesIn) throws IOException {
664
665          long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
666          byte[] startKey = Bytes.toBytes(startKeyIn);
667
668          Connection connection = ConnectionFactory.createConnection(getConf());
669          Table table = connection.getTable(getTableName(getConf()));
670          long numQueries = 0;
671          // If isSpecificStart is set, only walk one list from that particular node.
672          // Note that in case of circular (or P-shaped) list it will walk forever, as is
673          // the case in normal run without startKey.
674
675          CINode node = findStartNode(table, startKey);
676          if (node == null) {
677            LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
678            throw new IOException("Start node not found: " + startKeyIn);
679          }
680          while (numQueries < maxQueries) {
681            numQueries++;
682            byte[] prev = node.prev;
683            long t1 = System.currentTimeMillis();
684            node = getNode(prev, table, node);
685            long t2 = System.currentTimeMillis();
686            if (node == null) {
687              LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
688              context.getCounter(Counts.UNDEFINED).increment(1l);
689            } else if (node.prev.length == NO_KEY.length) {
690              LOG.error("ConcurrentWalker found TERMINATING NODE: " +
691                  Bytes.toStringBinary(node.key));
692              context.getCounter(Counts.TERMINATING).increment(1l);
693            } else {
694              // Increment for successful walk
695              context.getCounter(Counts.SUCCESS).increment(1l);
696            }
697          }
698          table.close();
699          connection.close();
700        }
701      }
702    }
703
704    @Override
705    public int run(String[] args) throws Exception {
706      if (args.length < 3) {
707        System.err.println(USAGE);
708        return 1;
709      }
710      try {
711        int numMappers = Integer.parseInt(args[0]);
712        long numNodes = Long.parseLong(args[1]);
713        Path tmpOutput = new Path(args[2]);
714        Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
715        Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
716        Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]);
717        return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
718      } catch (NumberFormatException e) {
719        System.err.println("Parsing generator arguments failed: " + e.getMessage());
720        System.err.println(USAGE);
721        return 1;
722      }
723    }
724
725    protected void createSchema() throws IOException {
726      Configuration conf = getConf();
727      TableName tableName = getTableName(conf);
728      try (Connection conn = ConnectionFactory.createConnection(conf);
729          Admin admin = conn.getAdmin()) {
730        if (!admin.tableExists(tableName)) {
731          HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
732          htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
733          // Always add these families. Just skip writing to them when we do not test per CF flush.
734          htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME));
735          htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME));
736          // if -DuseMob=true force all data through mob path.
737          if (conf.getBoolean("useMob", false)) {
738            for (HColumnDescriptor hcd : htd.getColumnFamilies() ) {
739              hcd.setMobEnabled(true);
740              hcd.setMobThreshold(4);
741            }
742          }
743
744          // If we want to pre-split compute how many splits.
745          if (conf.getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY,
746              HBaseTestingUtility.PRESPLIT_TEST_TABLE)) {
747            int numberOfServers =
748                admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
749                    .getLiveServerMetrics().size();
750            if (numberOfServers == 0) {
751              throw new IllegalStateException("No live regionservers");
752            }
753            int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
754                HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
755            int totalNumberOfRegions = numberOfServers * regionsPerServer;
756            LOG.info("Number of live regionservers: " + numberOfServers + ", " +
757                "pre-splitting table into " + totalNumberOfRegions + " regions " +
758                "(default regions per server: " + regionsPerServer + ")");
759
760
761            byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
762
763            admin.createTable(htd, splits);
764          } else {
765            // Looks like we're just letting things play out.
766            // Create a table with on region by default.
767            // This will make the splitting work hard.
768            admin.createTable(htd);
769          }
770        }
771      } catch (MasterNotRunningException e) {
772        LOG.error("Master not running", e);
773        throw new IOException(e);
774      }
775    }
776
777    public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
778        Integer width, Integer wrapMultiplier, Integer numWalkers)
779        throws Exception {
780      LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
781          + ", numNodes=" + numNodes);
782      Job job = Job.getInstance(getConf());
783
784      job.setJobName("Random Input Generator");
785      job.setNumReduceTasks(0);
786      job.setJarByClass(getClass());
787
788      job.setInputFormatClass(GeneratorInputFormat.class);
789      job.setOutputKeyClass(BytesWritable.class);
790      job.setOutputValueClass(NullWritable.class);
791
792      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
793
794      job.setMapperClass(Mapper.class); //identity mapper
795
796      FileOutputFormat.setOutputPath(job, tmpOutput);
797      job.setOutputFormatClass(SequenceFileOutputFormat.class);
798      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);
799
800      boolean success = jobCompletion(job);
801
802      return success ? 0 : 1;
803    }
804
805    public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
806        Integer width, Integer wrapMultiplier, Integer numWalkers)
807        throws Exception {
808      LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
809      createSchema();
810      job = Job.getInstance(getConf());
811
812      job.setJobName("Link Generator");
813      job.setNumReduceTasks(0);
814      job.setJarByClass(getClass());
815
816      FileInputFormat.setInputPaths(job, tmpOutput);
817      job.setInputFormatClass(OneFilePerMapperSFIF.class);
818      job.setOutputKeyClass(NullWritable.class);
819      job.setOutputValueClass(NullWritable.class);
820
821      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
822
823      setMapperForGenerator(job);
824
825      job.setOutputFormatClass(NullOutputFormat.class);
826
827      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
828      TableMapReduceUtil.addDependencyJars(job);
829      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
830                                                     AbstractHBaseTool.class);
831      TableMapReduceUtil.initCredentials(job);
832
833      boolean success = jobCompletion(job);
834
835      return success ? 0 : 1;
836    }
837
838    protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
839        ClassNotFoundException {
840      boolean success = job.waitForCompletion(true);
841      return success;
842    }
843
844    protected void setMapperForGenerator(Job job) {
845      job.setMapperClass(GeneratorMapper.class);
846    }
847
848    public int run(int numMappers, long numNodes, Path tmpOutput,
849        Integer width, Integer wrapMultiplier, Integer numWalkers)
850        throws Exception {
851      int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier,
852          numWalkers);
853      if (ret > 0) {
854        return ret;
855      }
856      return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
857    }
858
859    public boolean verify() {
860      try {
861        Counters counters = job.getCounters();
862        if (counters == null) {
863          LOG.info("Counters object was null, Generator verification cannot be performed."
864              + " This is commonly a result of insufficient YARN configuration.");
865          return false;
866        }
867
868        if (counters.findCounter(Counts.TERMINATING).getValue() > 0 ||
869            counters.findCounter(Counts.UNDEFINED).getValue() > 0 ||
870            counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) {
871          LOG.error("Concurrent walker failed to verify during Generation phase");
872          LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue());
873          LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue());
874          LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue());
875          return false;
876        }
877      } catch (IOException e) {
878        LOG.info("Generator verification could not find counter");
879        return false;
880      }
881      return true;
882    }
883  }
884
885  /**
886   * Tool to search missing rows in WALs and hfiles.
887   * Pass in file or dir of keys to search for. Key file must have been written by Verify step
888   * (we depend on the format it writes out. We'll read them in and then search in hbase
889   * WALs and oldWALs dirs (Some of this is TODO).
890   */
891  static class Search extends Configured implements Tool {
892    private static final Logger LOG = LoggerFactory.getLogger(Search.class);
893    protected Job job;
894
895    private static void printUsage(final String error) {
896      if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
897      System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
898    }
899
900    @Override
901    public int run(String[] args) throws Exception {
902      if (args.length < 1 || args.length > 2) {
903        printUsage(null);
904        return 1;
905      }
906      Path inputDir = new Path(args[0]);
907      int numMappers = 1;
908      if (args.length > 1) {
909        numMappers = Integer.parseInt(args[1]);
910      }
911      return run(inputDir, numMappers);
912    }
913
914    /**
915     * WALPlayer override that searches for keys loaded in the setup.
916     */
917    public static class WALSearcher extends WALPlayer {
918      public WALSearcher(Configuration conf) {
919        super(conf);
920      }
921
922      /**
923       * The actual searcher mapper.
924       */
925      public static class WALMapperSearcher extends WALMapper {
926        private SortedSet<byte []> keysToFind;
927        private AtomicInteger rows = new AtomicInteger(0);
928
929        @Override
930        public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
931            throws IOException {
932          super.setup(context);
933          try {
934            this.keysToFind = readKeysToSearch(context.getConfiguration());
935            LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
936          } catch (InterruptedException e) {
937            throw new InterruptedIOException(e.toString());
938          }
939        }
940
941        @Override
942        protected boolean filter(Context context, Cell cell) {
943          // TODO: Can I do a better compare than this copying out key?
944          byte [] row = new byte [cell.getRowLength()];
945          System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
946          boolean b = this.keysToFind.contains(row);
947          if (b) {
948            String keyStr = Bytes.toStringBinary(row);
949            try {
950              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
951            } catch (IOException|InterruptedException e) {
952              LOG.warn(e.toString(), e);
953            }
954            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
955              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
956            }
957            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
958          }
959          return b;
960        }
961      }
962
963      // Put in place the above WALMapperSearcher.
964      @Override
965      public Job createSubmittableJob(String[] args) throws IOException {
966        Job job = super.createSubmittableJob(args);
967        // Call my class instead.
968        job.setJarByClass(WALMapperSearcher.class);
969        job.setMapperClass(WALMapperSearcher.class);
970        job.setOutputFormatClass(NullOutputFormat.class);
971        return job;
972      }
973    }
974
975    static final String FOUND_GROUP_KEY = "Found";
976    static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
977
978    public int run(Path inputDir, int numMappers) throws Exception {
979      getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
980      SortedSet<byte []> keys = readKeysToSearch(getConf());
981      if (keys.isEmpty()) throw new RuntimeException("No keys to find");
982      LOG.info("Count of keys to find: " + keys.size());
983      for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
984      // Now read all WALs. In two dirs. Presumes certain layout.
985      Path walsDir = new Path(
986          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
987      Path oldWalsDir = new Path(
988          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
989      LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
990        " against " + getConf().get(HConstants.HBASE_DIR));
991      int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
992          new String [] {walsDir.toString(), ""});
993      if (ret != 0) {
994        return ret;
995      }
996      return ToolRunner.run(getConf(), new WALSearcher(getConf()),
997          new String [] {oldWalsDir.toString(), ""});
998    }
999
1000    static SortedSet<byte []> readKeysToSearch(final Configuration conf)
1001    throws IOException, InterruptedException {
1002      Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
1003      FileSystem fs = FileSystem.get(conf);
1004      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1005      if (!fs.exists(keysInputDir)) {
1006        throw new FileNotFoundException(keysInputDir.toString());
1007      }
1008      if (!fs.isDirectory(keysInputDir)) {
1009        throw new UnsupportedOperationException("TODO");
1010      } else {
1011        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
1012        while(iterator.hasNext()) {
1013          LocatedFileStatus keyFileStatus = iterator.next();
1014          // Skip "_SUCCESS" file.
1015          if (keyFileStatus.getPath().getName().startsWith("_")) continue;
1016          result.addAll(readFileToSearch(conf, fs, keyFileStatus));
1017        }
1018      }
1019      return result;
1020    }
1021
1022    private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
1023        final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
1024        InterruptedException {
1025      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1026      // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
1027      // what is missing.
1028      TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
1029      try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
1030          new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
1031        InputSplit is =
1032          new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
1033        rr.initialize(is, context);
1034        while (rr.nextKeyValue()) {
1035          rr.getCurrentKey();
1036          BytesWritable bw = rr.getCurrentValue();
1037          if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
1038            byte[] key = new byte[rr.getCurrentKey().getLength()];
1039            System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
1040                .getLength());
1041            result.add(key);
1042          }
1043        }
1044      }
1045      return result;
1046    }
1047  }
1048
1049  /**
1050   * A Map Reduce job that verifies that the linked lists generated by
1051   * {@link Generator} do not have any holes.
1052   */
1053  static class Verify extends Configured implements Tool {
1054
1055    private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
1056    protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
1057    protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
1058
1059    protected Job job;
1060
1061    public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
1062      private BytesWritable row = new BytesWritable();
1063      private BytesWritable ref = new BytesWritable();
1064
1065      private boolean multipleUnevenColumnFamilies;
1066
1067      @Override
1068      protected void setup(
1069          Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
1070          throws IOException, InterruptedException {
1071        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
1072      }
1073
1074      @Override
1075      protected void map(ImmutableBytesWritable key, Result value, Context context)
1076          throws IOException ,InterruptedException {
1077        byte[] rowKey = key.get();
1078        row.set(rowKey, 0, rowKey.length);
1079        if (multipleUnevenColumnFamilies
1080            && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn(
1081              TINY_FAMILY_NAME, TINY_FAMILY_NAME))) {
1082          context.write(row, DEF_LOST_FAMILIES);
1083        } else {
1084          context.write(row, DEF);
1085        }
1086        byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
1087        if (prev != null && prev.length > 0) {
1088          ref.set(prev, 0, prev.length);
1089          context.write(ref, row);
1090        } else {
1091          LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
1092        }
1093      }
1094    }
1095
1096    /**
1097     * Don't change the order of these enums. Their ordinals are used as type flag when we emit
1098     * problems found from the reducer.
1099     */
1100    public static enum Counts {
1101      UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES,
1102      LOST_FAMILIES
1103    }
1104
1105    /**
1106     * Per reducer, we output problem rows as byte arrasy so can be used as input for
1107     * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag
1108     * saying what sort of emission it is. Flag is the Count enum ordinal as a short.
1109     */
1110    public static class VerifyReducer extends
1111        Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
1112      private ArrayList<byte[]> refs = new ArrayList<>();
1113      private final BytesWritable UNREF = new BytesWritable(addPrefixFlag(
1114        Counts.UNREFERENCED.ordinal(), new byte[] {}));
1115      private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag(
1116        Counts.LOST_FAMILIES.ordinal(), new byte[] {}));
1117
1118      private AtomicInteger rows = new AtomicInteger(0);
1119      private Connection connection;
1120
1121      @Override
1122      protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1123      throws IOException, InterruptedException {
1124        super.setup(context);
1125        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
1126      }
1127
1128      @Override
1129      protected void cleanup(
1130          Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1131          throws IOException, InterruptedException {
1132        if (this.connection != null) {
1133          this.connection.close();
1134        }
1135        super.cleanup(context);
1136      }
1137
1138      /**
1139       * @param ordinal
1140       * @param r
1141       * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up
1142       * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
1143       */
1144      public static byte[] addPrefixFlag(final int ordinal, final byte [] r) {
1145        byte[] prefix = Bytes.toBytes((short)ordinal);
1146        if (prefix.length != Bytes.SIZEOF_SHORT) {
1147          throw new RuntimeException("Unexpected size: " + prefix.length);
1148        }
1149        byte[] result = new byte[prefix.length + r.length];
1150        System.arraycopy(prefix, 0, result, 0, prefix.length);
1151        System.arraycopy(r, 0, result, prefix.length, r.length);
1152        return result;
1153      }
1154
1155      /**
1156       * @param bs
1157       * @return Type from the Counts enum of this row. Reads prefix added by
1158       * {@link #addPrefixFlag(int, byte[])}
1159       */
1160      public static Counts whichType(final byte [] bs) {
1161        int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
1162        return Counts.values()[ordinal];
1163      }
1164
1165      /**
1166       * @param bw
1167       * @return Row bytes minus the type flag.
1168       */
1169      public static byte[] getRowOnly(BytesWritable bw) {
1170        byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
1171        System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
1172        return bytes;
1173      }
1174
1175      @Override
1176      public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
1177          throws IOException, InterruptedException {
1178        int defCount = 0;
1179        boolean lostFamilies = false;
1180        refs.clear();
1181        for (BytesWritable type : values) {
1182          if (type.getLength() == DEF.getLength()) {
1183            defCount++;
1184            if (type.getBytes()[0] == 1) {
1185              lostFamilies = true;
1186            }
1187          } else {
1188            byte[] bytes = new byte[type.getLength()];
1189            System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
1190            refs.add(bytes);
1191          }
1192        }
1193
1194        // TODO check for more than one def, should not happen
1195        StringBuilder refsSb = null;
1196        if (defCount == 0 || refs.size() != 1) {
1197          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1198          refsSb = dumpExtraInfoOnRefs(key, context, refs);
1199          LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
1200            (refsSb != null? refsSb.toString(): ""));
1201        }
1202        if (lostFamilies) {
1203          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1204          LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
1205          context.getCounter(Counts.LOST_FAMILIES).increment(1);
1206          context.write(key, LOSTFAM);
1207        }
1208
1209        if (defCount == 0 && refs.size() > 0) {
1210          // This is bad, found a node that is referenced but not defined. It must have been
1211          // lost, emit some info about this node for debugging purposes.
1212          // Write out a line per reference. If more than one, flag it.;
1213          for (int i = 0; i < refs.size(); i++) {
1214            byte[] bs = refs.get(i);
1215            int ordinal;
1216            if (i <= 0) {
1217              ordinal = Counts.UNDEFINED.ordinal();
1218              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1219              context.getCounter(Counts.UNDEFINED).increment(1);
1220            } else {
1221              ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
1222              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1223            }
1224          }
1225          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1226            // Print out missing row; doing get on reference gives info on when the referencer
1227            // was added which can help a little debugging. This info is only available in mapper
1228            // output -- the 'Linked List error Key...' log message above. What we emit here is
1229            // useless for debugging.
1230            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1231            context.getCounter("undef", keyString).increment(1);
1232          }
1233        } else if (defCount > 0 && refs.isEmpty()) {
1234          // node is defined but not referenced
1235          context.write(key, UNREF);
1236          context.getCounter(Counts.UNREFERENCED).increment(1);
1237          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1238            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1239            context.getCounter("unref", keyString).increment(1);
1240          }
1241        } else {
1242          if (refs.size() > 1) {
1243            // Skip first reference.
1244            for (int i = 1; i < refs.size(); i++) {
1245              context.write(key,
1246                new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
1247            }
1248            context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
1249          }
1250          // node is defined and referenced
1251          context.getCounter(Counts.REFERENCED).increment(1);
1252        }
1253      }
1254
1255      /**
1256       * Dump out extra info around references if there are any. Helps debugging.
1257       * @return StringBuilder filled with references if any.
1258       * @throws IOException
1259       */
1260      private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1261          final List<byte []> refs)
1262      throws IOException {
1263        StringBuilder refsSb = null;
1264        if (refs.isEmpty()) return refsSb;
1265        refsSb = new StringBuilder();
1266        String comma = "";
1267        // If a row is a reference but has no define, print the content of the row that has
1268        // this row as a 'prev'; it will help debug.  The missing row was written just before
1269        // the row we are dumping out here.
1270        TableName tn = getTableName(context.getConfiguration());
1271        try (Table t = this.connection.getTable(tn)) {
1272          for (byte [] ref : refs) {
1273            Result r = t.get(new Get(ref));
1274            List<Cell> cells = r.listCells();
1275            String ts = (cells != null && !cells.isEmpty())?
1276                new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
1277            byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1278            String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
1279            b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1280            long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
1281            b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1282            String refRegionLocation = "";
1283            String keyRegionLocation = "";
1284            if (b != null && b.length > 0) {
1285              try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1286                HRegionLocation hrl = rl.getRegionLocation(b);
1287                if (hrl != null) refRegionLocation = hrl.toString();
1288                // Key here probably has trailing zeros on it.
1289                hrl = rl.getRegionLocation(key.getBytes());
1290                if (hrl != null) keyRegionLocation = hrl.toString();
1291              }
1292            }
1293            LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) +
1294              ", refPrevEqualsKey=" +
1295                (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) +
1296                ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) +
1297                ", ref row date=" + ts + ", jobStr=" + jobStr +
1298                ", ref row count=" + count +
1299                ", ref row regionLocation=" + refRegionLocation +
1300                ", key row regionLocation=" + keyRegionLocation);
1301            refsSb.append(comma);
1302            comma = ",";
1303            refsSb.append(Bytes.toStringBinary(ref));
1304          }
1305        }
1306        return refsSb;
1307      }
1308    }
1309
1310    @Override
1311    public int run(String[] args) throws Exception {
1312      if (args.length != 2) {
1313        System.out.println("Usage : " + Verify.class.getSimpleName()
1314            + " <output dir> <num reducers>");
1315        return 0;
1316      }
1317
1318      String outputDir = args[0];
1319      int numReducers = Integer.parseInt(args[1]);
1320
1321      return run(outputDir, numReducers);
1322    }
1323
1324    public int run(String outputDir, int numReducers) throws Exception {
1325      return run(new Path(outputDir), numReducers);
1326    }
1327
1328    public int run(Path outputDir, int numReducers) throws Exception {
1329      LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
1330
1331      job = Job.getInstance(getConf());
1332
1333      job.setJobName("Link Verifier");
1334      job.setNumReduceTasks(numReducers);
1335      job.setJarByClass(getClass());
1336
1337      setJobScannerConf(job);
1338
1339      Scan scan = new Scan();
1340      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1341      scan.setCaching(10000);
1342      scan.setCacheBlocks(false);
1343      if (isMultiUnevenColumnFamilies(getConf())) {
1344        scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1345        scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1346      }
1347
1348      TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1349          VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1350      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
1351                                                     AbstractHBaseTool.class);
1352
1353      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1354
1355      job.setReducerClass(VerifyReducer.class);
1356      job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1357      job.setOutputKeyClass(BytesWritable.class);
1358      job.setOutputValueClass(BytesWritable.class);
1359      TextOutputFormat.setOutputPath(job, outputDir);
1360
1361      boolean success = job.waitForCompletion(true);
1362
1363      if (success) {
1364        Counters counters = job.getCounters();
1365        if (null == counters) {
1366          LOG.warn("Counters were null, cannot verify Job completion."
1367              + " This is commonly a result of insufficient YARN configuration.");
1368          // We don't have access to the counters to know if we have "bad" counts
1369          return 0;
1370        }
1371
1372        // If we find no unexpected values, the job didn't outright fail
1373        if (verifyUnexpectedValues(counters)) {
1374          // We didn't check referenced+unreferenced counts, leave that to visual inspection
1375          return 0;
1376        }
1377      }
1378
1379      // We failed
1380      return 1;
1381    }
1382
1383    public boolean verify(long expectedReferenced) throws Exception {
1384      if (job == null) {
1385        throw new IllegalStateException("You should call run() first");
1386      }
1387
1388      Counters counters = job.getCounters();
1389      if (counters == null) {
1390        LOG.info("Counters object was null, write verification cannot be performed."
1391              + " This is commonly a result of insufficient YARN configuration.");
1392        return false;
1393      }
1394
1395      // Run through each check, even if we fail one early
1396      boolean success = verifyExpectedValues(expectedReferenced, counters);
1397
1398      if (!verifyUnexpectedValues(counters)) {
1399        // We found counter objects which imply failure
1400        success = false;
1401      }
1402
1403      if (!success) {
1404        handleFailure(counters);
1405      }
1406      return success;
1407    }
1408
1409    /**
1410     * Verify the values in the Counters against the expected number of entries written.
1411     *
1412     * @param expectedReferenced
1413     *          Expected number of referenced entrires
1414     * @param counters
1415     *          The Job's Counters object
1416     * @return True if the values match what's expected, false otherwise
1417     */
1418    protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1419      final Counter referenced = counters.findCounter(Counts.REFERENCED);
1420      final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
1421      boolean success = true;
1422
1423      if (expectedReferenced != referenced.getValue()) {
1424        LOG.error("Expected referenced count does not match with actual referenced count. " +
1425            "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1426        success = false;
1427      }
1428
1429      if (unreferenced.getValue() > 0) {
1430        final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
1431        boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1432        LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1433            + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1434        success = false;
1435      }
1436
1437      return success;
1438    }
1439
1440    /**
1441     * Verify that the Counters don't contain values which indicate an outright failure from the Reducers.
1442     *
1443     * @param counters
1444     *          The Job's counters
1445     * @return True if the "bad" counter objects are 0, false otherwise
1446     */
1447    protected boolean verifyUnexpectedValues(Counters counters) {
1448      final Counter undefined = counters.findCounter(Counts.UNDEFINED);
1449      final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
1450      boolean success = true;
1451
1452      if (undefined.getValue() > 0) {
1453        LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1454        success = false;
1455      }
1456
1457      if (lostfamilies.getValue() > 0) {
1458        LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1459        success = false;
1460      }
1461
1462      return success;
1463    }
1464
1465    protected void handleFailure(Counters counters) throws IOException {
1466      Configuration conf = job.getConfiguration();
1467      TableName tableName = getTableName(conf);
1468      try (Connection conn = ConnectionFactory.createConnection(conf)) {
1469        try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1470          CounterGroup g = counters.getGroup("undef");
1471          Iterator<Counter> it = g.iterator();
1472          while (it.hasNext()) {
1473            String keyString = it.next().getName();
1474            byte[] key = Bytes.toBytes(keyString);
1475            HRegionLocation loc = rl.getRegionLocation(key, true);
1476            LOG.error("undefined row " + keyString + ", " + loc);
1477          }
1478          g = counters.getGroup("unref");
1479          it = g.iterator();
1480          while (it.hasNext()) {
1481            String keyString = it.next().getName();
1482            byte[] key = Bytes.toBytes(keyString);
1483            HRegionLocation loc = rl.getRegionLocation(key, true);
1484            LOG.error("unreferred row " + keyString + ", " + loc);
1485          }
1486        }
1487      }
1488    }
1489  }
1490
1491  /**
1492   * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
1493   * adds more data.
1494   */
1495  static class Loop extends Configured implements Tool {
1496
1497    private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
1498    private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " +
1499        "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" +
1500        " <num walker threads>] \n" +
1501        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
1502        "walkers will select and verify random flushed loop during Generation.";
1503
1504    IntegrationTestBigLinkedList it;
1505
1506    protected void runGenerator(int numMappers, long numNodes,
1507        String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers)
1508        throws Exception {
1509      Path outputPath = new Path(outputDir);
1510      UUID uuid = UUID.randomUUID(); //create a random UUID.
1511      Path generatorOutput = new Path(outputPath, uuid.toString());
1512
1513      Generator generator = new Generator();
1514      generator.setConf(getConf());
1515      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
1516          numWalkers);
1517      if (retCode > 0) {
1518        throw new RuntimeException("Generator failed with return code: " + retCode);
1519      }
1520      if (numWalkers > 0) {
1521        if (!generator.verify()) {
1522          throw new RuntimeException("Generator.verify failed");
1523        }
1524      }
1525    }
1526
1527    protected void runVerify(String outputDir,
1528        int numReducers, long expectedNumNodes) throws Exception {
1529      Path outputPath = new Path(outputDir);
1530      UUID uuid = UUID.randomUUID(); //create a random UUID.
1531      Path iterationOutput = new Path(outputPath, uuid.toString());
1532
1533      Verify verify = new Verify();
1534      verify.setConf(getConf());
1535      int retCode = verify.run(iterationOutput, numReducers);
1536      if (retCode > 0) {
1537        throw new RuntimeException("Verify.run failed with return code: " + retCode);
1538      }
1539
1540      if (!verify.verify(expectedNumNodes)) {
1541        throw new RuntimeException("Verify.verify failed");
1542      }
1543      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
1544    }
1545
1546    @Override
1547    public int run(String[] args) throws Exception {
1548      if (args.length < 5) {
1549        System.err.println(USAGE);
1550        return 1;
1551      }
1552      try {
1553        int numIterations = Integer.parseInt(args[0]);
1554        int numMappers = Integer.parseInt(args[1]);
1555        long numNodes = Long.parseLong(args[2]);
1556        String outputDir = args[3];
1557        int numReducers = Integer.parseInt(args[4]);
1558        Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1559        Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1560        Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]);
1561
1562        long expectedNumNodes = 0;
1563
1564        if (numIterations < 0) {
1565          numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
1566        }
1567        LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1568        for (int i = 0; i < numIterations; i++) {
1569          LOG.info("Starting iteration = " + i);
1570          runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers);
1571          expectedNumNodes += numMappers * numNodes;
1572          runVerify(outputDir, numReducers, expectedNumNodes);
1573        }
1574        return 0;
1575      } catch (NumberFormatException e) {
1576        System.err.println("Parsing loop arguments failed: " + e.getMessage());
1577        System.err.println(USAGE);
1578        return 1;
1579      }
1580    }
1581  }
1582
1583  /**
1584   * A stand alone program that prints out portions of a list created by {@link Generator}
1585   */
1586  private static class Print extends Configured implements Tool {
1587    @Override
1588    public int run(String[] args) throws Exception {
1589      Options options = new Options();
1590      options.addOption("s", "start", true, "start key");
1591      options.addOption("e", "end", true, "end key");
1592      options.addOption("l", "limit", true, "number to print");
1593
1594      GnuParser parser = new GnuParser();
1595      CommandLine cmd = null;
1596      try {
1597        cmd = parser.parse(options, args);
1598        if (cmd.getArgs().length != 0) {
1599          throw new ParseException("Command takes no arguments");
1600        }
1601      } catch (ParseException e) {
1602        System.err.println("Failed to parse command line " + e.getMessage());
1603        System.err.println();
1604        HelpFormatter formatter = new HelpFormatter();
1605        formatter.printHelp(getClass().getSimpleName(), options);
1606        System.exit(-1);
1607      }
1608
1609      Connection connection = ConnectionFactory.createConnection(getConf());
1610      Table table = connection.getTable(getTableName(getConf()));
1611
1612      Scan scan = new Scan();
1613      scan.setBatch(10000);
1614
1615      if (cmd.hasOption("s"))
1616        scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1617
1618      if (cmd.hasOption("e"))
1619        scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1620
1621      int limit = 0;
1622      if (cmd.hasOption("l"))
1623        limit = Integer.parseInt(cmd.getOptionValue("l"));
1624      else
1625        limit = 100;
1626
1627      ResultScanner scanner = table.getScanner(scan);
1628
1629      CINode node = new CINode();
1630      Result result = scanner.next();
1631      int count = 0;
1632      while (result != null && count++ < limit) {
1633        node = getCINode(result, node);
1634        System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1635            Bytes.toStringBinary(node.prev), node.count, node.client);
1636        result = scanner.next();
1637      }
1638      scanner.close();
1639      table.close();
1640      connection.close();
1641
1642      return 0;
1643    }
1644  }
1645
1646  /**
1647   * A stand alone program that deletes a single node.
1648   */
1649  private static class Delete extends Configured implements Tool {
1650    @Override
1651    public int run(String[] args) throws Exception {
1652      if (args.length != 1) {
1653        System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1654        return 0;
1655      }
1656      byte[] val = Bytes.toBytesBinary(args[0]);
1657
1658      org.apache.hadoop.hbase.client.Delete delete
1659        = new org.apache.hadoop.hbase.client.Delete(val);
1660
1661      try (Connection connection = ConnectionFactory.createConnection(getConf());
1662          Table table = connection.getTable(getTableName(getConf()))) {
1663        table.delete(delete);
1664      }
1665
1666      System.out.println("Delete successful");
1667      return 0;
1668    }
1669  }
1670
1671  abstract static class WalkerBase extends Configured{
1672    protected static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1673      Scan scan = new Scan();
1674      scan.setStartRow(startKey);
1675      scan.setBatch(1);
1676      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1677
1678      long t1 = System.currentTimeMillis();
1679      ResultScanner scanner = table.getScanner(scan);
1680      Result result = scanner.next();
1681      long t2 = System.currentTimeMillis();
1682      scanner.close();
1683
1684      if ( result != null) {
1685        CINode node = getCINode(result, new CINode());
1686        System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1687        return node;
1688      }
1689
1690      System.out.println("FSR " + (t2 - t1));
1691
1692      return null;
1693    }
1694    protected CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1695      Get get = new Get(row);
1696      get.addColumn(FAMILY_NAME, COLUMN_PREV);
1697      Result result = table.get(get);
1698      return getCINode(result, node);
1699    }
1700  }
1701  /**
1702   * A stand alone program that follows a linked list created by {@link Generator} and prints
1703   * timing info.
1704   */
1705  private static class Walker extends WalkerBase implements Tool {
1706
1707    public Walker(){}
1708
1709    @Override
1710    public int run(String[] args) throws IOException {
1711
1712      Options options = new Options();
1713      options.addOption("n", "num", true, "number of queries");
1714      options.addOption("s", "start", true, "key to start at, binary string");
1715      options.addOption("l", "logevery", true, "log every N queries");
1716
1717      GnuParser parser = new GnuParser();
1718      CommandLine cmd = null;
1719      try {
1720        cmd = parser.parse(options, args);
1721        if (cmd.getArgs().length != 0) {
1722          throw new ParseException("Command takes no arguments");
1723        }
1724      } catch (ParseException e) {
1725        System.err.println("Failed to parse command line " + e.getMessage());
1726        System.err.println();
1727        HelpFormatter formatter = new HelpFormatter();
1728        formatter.printHelp(getClass().getSimpleName(), options);
1729        System.exit(-1);
1730      }
1731
1732      long maxQueries = Long.MAX_VALUE;
1733      if (cmd.hasOption('n')) {
1734        maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1735      }
1736      Random rand = new SecureRandom();
1737      boolean isSpecificStart = cmd.hasOption('s');
1738
1739      byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1740      int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1741
1742      Connection connection = ConnectionFactory.createConnection(getConf());
1743      Table table = connection.getTable(getTableName(getConf()));
1744      long numQueries = 0;
1745      // If isSpecificStart is set, only walk one list from that particular node.
1746      // Note that in case of circular (or P-shaped) list it will walk forever, as is
1747      // the case in normal run without startKey.
1748      while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1749        if (!isSpecificStart) {
1750          startKey = new byte[ROWKEY_LENGTH];
1751          rand.nextBytes(startKey);
1752        }
1753        CINode node = findStartNode(table, startKey);
1754        if (node == null && isSpecificStart) {
1755          System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1756        }
1757        numQueries++;
1758        while (node != null && node.prev.length != NO_KEY.length &&
1759            numQueries < maxQueries) {
1760          byte[] prev = node.prev;
1761          long t1 = System.currentTimeMillis();
1762          node = getNode(prev, table, node);
1763          long t2 = System.currentTimeMillis();
1764          if (logEvery > 0 && numQueries % logEvery == 0) {
1765            System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1766          }
1767          numQueries++;
1768          if (node == null) {
1769            System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1770          } else if (node.prev.length == NO_KEY.length) {
1771            System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1772          }
1773        }
1774      }
1775      table.close();
1776      connection.close();
1777      return 0;
1778    }
1779  }
1780
1781  private static class Clean extends Configured implements Tool {
1782    @Override public int run(String[] args) throws Exception {
1783      if (args.length < 1) {
1784        System.err.println("Usage: Clean <output dir>");
1785        return -1;
1786      }
1787
1788      Path p = new Path(args[0]);
1789      Configuration conf = getConf();
1790      TableName tableName = getTableName(conf);
1791      try (FileSystem fs = HFileSystem.get(conf);
1792          Connection conn = ConnectionFactory.createConnection(conf);
1793          Admin admin = conn.getAdmin()) {
1794        if (admin.tableExists(tableName)) {
1795          admin.disableTable(tableName);
1796          admin.deleteTable(tableName);
1797        }
1798
1799        if (fs.exists(p)) {
1800          fs.delete(p, true);
1801        }
1802      }
1803
1804      return 0;
1805    }
1806  }
1807
1808  static TableName getTableName(Configuration conf) {
1809    return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1810  }
1811
1812  private static CINode getCINode(Result result, CINode node) {
1813    node.key = Bytes.copy(result.getRow());
1814    if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1815      node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1816    } else {
1817      node.prev = NO_KEY;
1818    }
1819    if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1820      node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1821    } else {
1822      node.count = -1;
1823    }
1824    if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1825      node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1826    } else {
1827      node.client = "";
1828    }
1829    return node;
1830  }
1831
1832  protected IntegrationTestingUtility util;
1833
1834  @Override
1835  public void setUpCluster() throws Exception {
1836    util = getTestingUtil(getConf());
1837    boolean isDistributed = util.isDistributedCluster();
1838    util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1839    if (!isDistributed) {
1840      util.startMiniMapReduceCluster();
1841    }
1842    this.setConf(util.getConfiguration());
1843  }
1844
1845  @Override
1846  public void cleanUpCluster() throws Exception {
1847    super.cleanUpCluster();
1848    if (util.isDistributedCluster()) {
1849      util.shutdownMiniMapReduceCluster();
1850    }
1851  }
1852
1853  private static boolean isMultiUnevenColumnFamilies(Configuration conf) {
1854    return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,true);
1855  }
1856
1857  @Test
1858  public void testContinuousIngest() throws IOException, Exception {
1859    //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1860    Configuration conf = getTestingUtil(getConf()).getConfiguration();
1861    if (isMultiUnevenColumnFamilies(getConf())) {
1862      // make sure per CF flush is on
1863      conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
1864    }
1865    int ret =
1866        ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1867            util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1868    org.junit.Assert.assertEquals(0, ret);
1869  }
1870
1871  private void usage() {
1872    System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1873    printCommands();
1874  }
1875
1876  private void printCommands() {
1877    System.err.println("Commands:");
1878    System.err.println(" generator  Map only job that generates data.");
1879    System.err.println(" verify     A map reduce job that looks for holes. Check return code and");
1880    System.err.println("            look at the counts after running. See REFERENCED and");
1881    System.err.println("            UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1882    System.err.println("            with the Generator.");
1883    System.err.println(" walker     " +
1884      "Standalone program that starts following a linked list & emits timing info.");
1885    System.err.println(" print      Standalone program that prints nodes in the linked list.");
1886    System.err.println(" delete     Standalone program that deletes a·single node.");
1887    System.err.println(" loop       Program to Loop through Generator and Verify steps");
1888    System.err.println(" clean      Program to clean all left over detritus.");
1889    System.err.println(" search     Search for missing keys.");
1890    System.err.println("");
1891    System.err.println("General options:");
1892    System.err.println(" -D"+ TABLE_NAME_KEY+ "=<tableName>");
1893    System.err.println("    Run using the <tableName> as the tablename.  Defaults to "
1894        + DEFAULT_TABLE_NAME);
1895    System.err.println(" -D"+ HBaseTestingUtility.REGIONS_PER_SERVER_KEY+ "=<# regions>");
1896    System.err.println("    Create table with presplit regions per server.  Defaults to "
1897        + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
1898
1899    System.err.println(" -DuseMob=<true|false>");
1900    System.err.println("    Create table so that the mob read/write path is forced.  " +
1901        "Defaults to false");
1902
1903    System.err.flush();
1904  }
1905
1906  @Override
1907  protected void processOptions(CommandLine cmd) {
1908    super.processOptions(cmd);
1909    String[] args = cmd.getArgs();
1910    //get the class, run with the conf
1911    if (args.length < 1) {
1912      printUsage(this.getClass().getSimpleName() +
1913        " <general options> COMMAND [<COMMAND options>]", "General options:", "");
1914      printCommands();
1915      // Have to throw an exception here to stop the processing. Looks ugly but gets message across.
1916      throw new RuntimeException("Incorrect Number of args.");
1917    }
1918    toRun = args[0];
1919    otherArgs = Arrays.copyOfRange(args, 1, args.length);
1920  }
1921
1922  @Override
1923  public int runTestFromCommandLine() throws Exception {
1924    Tool tool = null;
1925    if (toRun.equalsIgnoreCase("Generator")) {
1926      tool = new Generator();
1927    } else if (toRun.equalsIgnoreCase("Verify")) {
1928      tool = new Verify();
1929    } else if (toRun.equalsIgnoreCase("Loop")) {
1930      Loop loop = new Loop();
1931      loop.it = this;
1932      tool = loop;
1933    } else if (toRun.equalsIgnoreCase("Walker")) {
1934      tool = new Walker();
1935    } else if (toRun.equalsIgnoreCase("Print")) {
1936      tool = new Print();
1937    } else if (toRun.equalsIgnoreCase("Delete")) {
1938      tool = new Delete();
1939    } else if (toRun.equalsIgnoreCase("Clean")) {
1940      tool = new Clean();
1941    } else if (toRun.equalsIgnoreCase("Search")) {
1942      tool = new Search();
1943    } else {
1944      usage();
1945      throw new RuntimeException("Unknown arg");
1946    }
1947
1948    return ToolRunner.run(getConf(), tool, otherArgs);
1949  }
1950
1951  @Override
1952  public TableName getTablename() {
1953    Configuration c = getConf();
1954    return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1955  }
1956
1957  @Override
1958  protected Set<String> getColumnFamilies() {
1959    if (isMultiUnevenColumnFamilies(getConf())) {
1960      return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
1961        Bytes.toString(TINY_FAMILY_NAME));
1962    } else {
1963      return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1964    }
1965  }
1966
1967  private static void setJobConf(Job job, int numMappers, long numNodes,
1968      Integer width, Integer wrapMultiplier, Integer numWalkers) {
1969    job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1970    job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1971    if (width != null) {
1972      job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1973    }
1974    if (wrapMultiplier != null) {
1975      job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1976    }
1977    if (numWalkers != null) {
1978      job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers);
1979    }
1980  }
1981
1982  public static void setJobScannerConf(Job job) {
1983    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1984  }
1985
1986  public static void main(String[] args) throws Exception {
1987    Configuration conf = HBaseConfiguration.create();
1988    IntegrationTestingUtility.setUseDistributedCluster(conf);
1989    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1990    System.exit(ret);
1991  }
1992}