001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.test;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.security.SecureRandom;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.EnumSet;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Random;
033import java.util.Set;
034import java.util.SortedSet;
035import java.util.TreeSet;
036import java.util.UUID;
037import java.util.concurrent.ThreadLocalRandom;
038import java.util.concurrent.atomic.AtomicInteger;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.conf.Configured;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.LocatedFileStatus;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.fs.RemoteIterator;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.ClusterMetrics.Option;
047import org.apache.hadoop.hbase.HBaseConfiguration;
048import org.apache.hadoop.hbase.HBaseTestingUtility;
049import org.apache.hadoop.hbase.HColumnDescriptor;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.HRegionLocation;
052import org.apache.hadoop.hbase.HTableDescriptor;
053import org.apache.hadoop.hbase.IntegrationTestBase;
054import org.apache.hadoop.hbase.IntegrationTestingUtility;
055import org.apache.hadoop.hbase.MasterNotRunningException;
056import org.apache.hadoop.hbase.TableName;
057import org.apache.hadoop.hbase.client.Admin;
058import org.apache.hadoop.hbase.client.BufferedMutator;
059import org.apache.hadoop.hbase.client.BufferedMutatorParams;
060import org.apache.hadoop.hbase.client.Connection;
061import org.apache.hadoop.hbase.client.ConnectionFactory;
062import org.apache.hadoop.hbase.client.Get;
063import org.apache.hadoop.hbase.client.Mutation;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.RegionLocator;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.client.ResultScanner;
068import org.apache.hadoop.hbase.client.Scan;
069import org.apache.hadoop.hbase.client.ScannerCallable;
070import org.apache.hadoop.hbase.client.Table;
071import org.apache.hadoop.hbase.fs.HFileSystem;
072import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
073import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
074import org.apache.hadoop.hbase.mapreduce.TableMapper;
075import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
076import org.apache.hadoop.hbase.mapreduce.WALPlayer;
077import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy;
078import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
079import org.apache.hadoop.hbase.testclassification.IntegrationTests;
080import org.apache.hadoop.hbase.util.AbstractHBaseTool;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.hbase.util.CommonFSUtils;
083import org.apache.hadoop.hbase.util.RegionSplitter;
084import org.apache.hadoop.hbase.wal.WALEdit;
085import org.apache.hadoop.hbase.wal.WALKey;
086import org.apache.hadoop.io.BytesWritable;
087import org.apache.hadoop.io.NullWritable;
088import org.apache.hadoop.io.Writable;
089import org.apache.hadoop.mapreduce.Counter;
090import org.apache.hadoop.mapreduce.CounterGroup;
091import org.apache.hadoop.mapreduce.Counters;
092import org.apache.hadoop.mapreduce.InputFormat;
093import org.apache.hadoop.mapreduce.InputSplit;
094import org.apache.hadoop.mapreduce.Job;
095import org.apache.hadoop.mapreduce.JobContext;
096import org.apache.hadoop.mapreduce.Mapper;
097import org.apache.hadoop.mapreduce.RecordReader;
098import org.apache.hadoop.mapreduce.Reducer;
099import org.apache.hadoop.mapreduce.TaskAttemptContext;
100import org.apache.hadoop.mapreduce.TaskAttemptID;
101import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
102import org.apache.hadoop.mapreduce.lib.input.FileSplit;
103import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
104import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
105import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
106import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
107import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
108import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
109import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
110import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
111import org.apache.hadoop.util.Tool;
112import org.apache.hadoop.util.ToolRunner;
113import org.junit.Test;
114import org.junit.experimental.categories.Category;
115import org.slf4j.Logger;
116import org.slf4j.LoggerFactory;
117import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
118import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
119import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
120import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
121import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
122import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
123
124/**
125 * This is an integration test borrowed from goraci, written by Keith Turner,
126 * which is in turn inspired by the Accumulo test called continous ingest (ci).
127 * The original source code can be found here:
128 * https://github.com/keith-turner/goraci
129 * https://github.com/enis/goraci/
130 *
131 * Apache Accumulo [0] has a simple test suite that verifies that data is not
132 * lost at scale. This test suite is called continuous ingest. This test runs
133 * many ingest clients that continually create linked lists containing 25
134 * million nodes. At some point the clients are stopped and a map reduce job is
135 * run to ensure no linked list has a hole. A hole indicates data was lost.··
136 *
137 * The nodes in the linked list are random. This causes each linked list to
138 * spread across the table. Therefore if one part of a table loses data, then it
139 * will be detected by references in another part of the table.
140 *
141 * THE ANATOMY OF THE TEST
142 *
143 * Below is rough sketch of how data is written. For specific details look at
144 * the Generator code.
145 *
146 * 1 Write out 1 million nodes· 2 Flush the client· 3 Write out 1 million that
147 * reference previous million· 4 If this is the 25th set of 1 million nodes,
148 * then update 1st set of million to point to last· 5 goto 1
149 *
150 * The key is that nodes only reference flushed nodes. Therefore a node should
151 * never reference a missing node, even if the ingest client is killed at any
152 * point in time.
153 *
154 * When running this test suite w/ Accumulo there is a script running in
155 * parallel called the Aggitator that randomly and continuously kills server
156 * processes.·· The outcome was that many data loss bugs were found in Accumulo
157 * by doing this.· This test suite can also help find bugs that impact uptime
158 * and stability when· run for days or weeks.··
159 *
160 * This test suite consists the following· - a few Java programs· - a little
161 * helper script to run the java programs - a maven script to build it.··
162 *
163 * When generating data, its best to have each map task generate a multiple of
164 * 25 million. The reason for this is that circular linked list are generated
165 * every 25M. Not generating a multiple in 25M will result in some nodes in the
166 * linked list not having references. The loss of an unreferenced node can not
167 * be detected.
168 *
169 *
170 * Below is a description of the Java programs
171 *
172 * Generator - A map only job that generates data. As stated previously,·its best to generate data
173 * in multiples of 25M. An option is also available to allow concurrent walkers to select and walk
174 * random flushed loops during this phase.
175 *
176 * Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and
177 * UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same
178 * time as the Generator.
179 *
180 * Walker - A standalone program that start following a linked list· and emits timing info.··
181 *
182 * Print - A standalone program that prints nodes in the linked list
183 *
184 * Delete - A standalone program that deletes a single node
185 *
186 * This class can be run as a unit test, as an integration test, or from the command line
187 *
188 * ex:
189 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
190 *    loop 2 1 100000 /temp 1 1000 50 1 0
191 *
192 */
193@Category(IntegrationTests.class)
194public class IntegrationTestBigLinkedList extends IntegrationTestBase {
195  protected static final byte[] NO_KEY = new byte[1];
196
197  protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
198
199  protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
200
201  protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
202  private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
203  private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
204
205  //link to the id of the prev node in the linked list
206  protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
207
208  //identifier of the mapred task that generated this row
209  protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
210
211  //the id of the row within the same client.
212  protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
213
214  /** How many rows to write per map task. This has to be a multiple of 25M */
215  private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
216    = "IntegrationTestBigLinkedList.generator.num_rows";
217
218  private static final String GENERATOR_NUM_MAPPERS_KEY
219    = "IntegrationTestBigLinkedList.generator.map.tasks";
220
221  private static final String GENERATOR_WIDTH_KEY
222    = "IntegrationTestBigLinkedList.generator.width";
223
224  private static final String GENERATOR_WRAP_KEY
225    = "IntegrationTestBigLinkedList.generator.wrap";
226
227  private static final String CONCURRENT_WALKER_KEY
228    = "IntegrationTestBigLinkedList.generator.concurrentwalkers";
229
230  protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
231
232  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
233
234  private static final int WIDTH_DEFAULT = 1000000;
235  private static final int WRAP_DEFAULT = 25;
236  private static final int ROWKEY_LENGTH = 16;
237
238  private static final int CONCURRENT_WALKER_DEFAULT = 0;
239
240  protected String toRun;
241  protected String[] otherArgs;
242
243  static class CINode {
244    byte[] key;
245    byte[] prev;
246    String client;
247    long count;
248  }
249
250  /**
251   * A Map only job that generates random linked list and stores them.
252   */
253  static class Generator extends Configured implements Tool {
254
255    private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
256
257    /**
258     * Set this configuration if you want to test single-column family flush works. If set, we will
259     * add a big column family and a small column family on either side of the usual ITBLL 'meta'
260     * column family. When we write out the ITBLL, we will also add to the big column family a value
261     * bigger than that for ITBLL and for small, something way smaller. The idea is that when
262     * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
263     * way. Here is how you would pass it:
264     * <p>
265     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
266     * -Dgenerator.multiple.columnfamilies=true generator 1 10 g
267     */
268    public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
269        "generator.multiple.columnfamilies";
270
271    public static enum Counts {
272      SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION
273    }
274
275    public static final String USAGE =  "Usage : " + Generator.class.getSimpleName() +
276        " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" +
277        " <num walker threads>] \n" +
278        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
279        "walkers will verify random flushed loop during Generation.";
280
281    public Job job;
282
283    static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
284      static class GeneratorInputSplit extends InputSplit implements Writable {
285        @Override
286        public long getLength() throws IOException, InterruptedException {
287          return 1;
288        }
289        @Override
290        public String[] getLocations() throws IOException, InterruptedException {
291          return new String[0];
292        }
293        @Override
294        public void readFields(DataInput arg0) throws IOException {
295        }
296        @Override
297        public void write(DataOutput arg0) throws IOException {
298        }
299      }
300
301      static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
302        private long count;
303        private long numNodes;
304        private Random rand;
305
306        @Override
307        public void close() throws IOException {
308        }
309
310        @Override
311        public BytesWritable getCurrentKey() throws IOException, InterruptedException {
312          byte[] bytes = new byte[ROWKEY_LENGTH];
313          rand.nextBytes(bytes);
314          return new BytesWritable(bytes);
315        }
316
317        @Override
318        public NullWritable getCurrentValue() throws IOException, InterruptedException {
319          return NullWritable.get();
320        }
321
322        @Override
323        public float getProgress() throws IOException, InterruptedException {
324          return (float)(count / (double)numNodes);
325        }
326
327        @Override
328        public void initialize(InputSplit arg0, TaskAttemptContext context)
329            throws IOException, InterruptedException {
330          numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
331          // Use SecureRandom to avoid issue described in HBASE-13382.
332          rand = new SecureRandom();
333        }
334
335        @Override
336        public boolean nextKeyValue() throws IOException, InterruptedException {
337          return count++ < numNodes;
338        }
339
340      }
341
342      @Override
343      public RecordReader<BytesWritable,NullWritable> createRecordReader(
344          InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
345        GeneratorRecordReader rr = new GeneratorRecordReader();
346        rr.initialize(split, context);
347        return rr;
348      }
349
350      @Override
351      public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
352        int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
353
354        ArrayList<InputSplit> splits = new ArrayList<>(numMappers);
355
356        for (int i = 0; i < numMappers; i++) {
357          splits.add(new GeneratorInputSplit());
358        }
359
360        return splits;
361      }
362    }
363
364    /** Ensure output files from prev-job go to map inputs for current job */
365    static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
366      @Override
367      protected boolean isSplitable(JobContext context, Path filename) {
368        return false;
369      }
370    }
371
372    /**
373     * Some ASCII art time:
374     * <p>
375     * [ . . . ] represents one batch of random longs of length WIDTH
376     * <pre>
377     *                _________________________
378     *               |                  ______ |
379     *               |                 |      ||
380     *             .-+-----------------+-----.||
381     *             | |                 |     |||
382     * first   = [ . . . . . . . . . . . ]   |||
383     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
384     *             | | | | | | | | | | |     |||
385     * prev    = [ . . . . . . . . . . . ]   |||
386     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
387     *             | | | | | | | | | | |     |||
388     * current = [ . . . . . . . . . . . ]   |||
389     *                                       |||
390     * ...                                   |||
391     *                                       |||
392     * last    = [ . . . . . . . . . . . ]   |||
393     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
394     *             |                 |________||
395     *             |___________________________|
396     * </pre>
397     */
398
399    static class GeneratorMapper
400      extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
401
402      byte[][] first = null;
403      byte[][] prev = null;
404      byte[][] current = null;
405      byte[] id;
406      long count = 0;
407      int i;
408      BufferedMutator mutator;
409      Connection connection;
410      long numNodes;
411      long wrap;
412      int width;
413      boolean multipleUnevenColumnFamilies;
414      byte[] tinyValue = new byte[] { 't' };
415      byte[] bigValue = null;
416      Configuration conf;
417
418      volatile boolean walkersStop;
419      int numWalkers;
420      volatile List<Long> flushedLoops = new ArrayList<>();
421      List<Thread> walkers = new ArrayList<>();
422
423      @Override
424      protected void setup(Context context) throws IOException, InterruptedException {
425        id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
426        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
427        instantiateHTable();
428        this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
429        current = new byte[this.width][];
430        int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
431        this.wrap = (long)wrapMultiplier * width;
432        this.numNodes = context.getConfiguration().getLong(
433            GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
434        if (this.numNodes < this.wrap) {
435          this.wrap = this.numNodes;
436        }
437        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
438        this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
439        this.walkersStop = false;
440        this.conf = context.getConfiguration();
441      }
442
443      protected void instantiateHTable() throws IOException {
444        mutator = connection.getBufferedMutator(
445            new BufferedMutatorParams(getTableName(connection.getConfiguration()))
446                .writeBufferSize(4 * 1024 * 1024));
447      }
448
449      @Override
450      protected void cleanup(Context context) throws IOException ,InterruptedException {
451        joinWalkers();
452        mutator.close();
453        connection.close();
454      }
455
456      @Override
457      protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
458        current[i] = new byte[key.getLength()];
459        System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
460        if (++i == current.length) {
461          LOG.info("Persisting current.length=" + current.length + ", count=" + count + ", id=" +
462            Bytes.toStringBinary(id) + ", current=" + Bytes.toStringBinary(current[0]) +
463            ", i=" + i);
464          persist(output, count, prev, current, id);
465          i = 0;
466
467          if (first == null) {
468            first = current;
469          }
470          prev = current;
471          current = new byte[this.width][];
472
473          count += current.length;
474          output.setStatus("Count " + count);
475
476          if (count % wrap == 0) {
477            // this block of code turns the 1 million linked list of length 25 into one giant
478            //circular linked list of 25 million
479            circularLeftShift(first);
480            persist(output, -1, prev, first, null);
481            // At this point the entire loop has been flushed so we can add one of its nodes to the
482            // concurrent walker
483            if (numWalkers > 0) {
484              addFlushed(key.getBytes());
485              if (walkers.isEmpty()) {
486                startWalkers(numWalkers, conf, output);
487              }
488            }
489            first = null;
490            prev = null;
491          }
492        }
493      }
494
495      private static <T> void circularLeftShift(T[] first) {
496        T ez = first[0];
497        System.arraycopy(first, 1, first, 0, first.length - 1);
498        first[first.length - 1] = ez;
499      }
500
501      private void addFlushed(byte[] rowKey) {
502        synchronized (flushedLoops) {
503          flushedLoops.add(Bytes.toLong(rowKey));
504          flushedLoops.notifyAll();
505        }
506      }
507
508      protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
509          throws IOException {
510        for (int i = 0; i < current.length; i++) {
511
512          if (i % 100 == 0) {
513            // Tickle progress every so often else maprunner will think us hung
514            output.progress();
515          }
516
517          Put put = new Put(current[i]);
518          put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
519
520          if (count >= 0) {
521            put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
522          }
523          if (id != null) {
524            put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
525          }
526          // See if we are to write multiple columns.
527          if (this.multipleUnevenColumnFamilies) {
528            // Use any column name.
529            put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
530            // If we've not allocated bigValue, do it now. Reuse same value each time.
531            if (this.bigValue == null) {
532              this.bigValue = new byte[current[i].length * 10];
533              ThreadLocalRandom.current().nextBytes(this.bigValue);
534            }
535            // Use any column name.
536            put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
537          }
538          mutator.mutate(put);
539        }
540
541        mutator.flush();
542      }
543
544      private void startWalkers(int numWalkers, Configuration conf, Context context) {
545        LOG.info("Starting " + numWalkers + " concurrent walkers");
546        for (int i = 0; i < numWalkers; i++) {
547          Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context));
548          walker.start();
549          walkers.add(walker);
550        }
551      }
552
553      private void joinWalkers() {
554        walkersStop = true;
555        synchronized (flushedLoops) {
556          flushedLoops.notifyAll();
557        }
558        for (Thread walker : walkers) {
559          try {
560            walker.join();
561          } catch (InterruptedException e) {
562            // no-op
563          }
564        }
565      }
566
567      /**
568       * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by
569       * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are
570       * configured to only log erroneous nodes.
571       */
572
573      public class ContinuousConcurrentWalker implements Runnable {
574
575        ConcurrentWalker walker;
576        Configuration conf;
577        Context context;
578        Random rand;
579
580        public ContinuousConcurrentWalker(Configuration conf, Context context) {
581          this.conf = conf;
582          this.context = context;
583          rand = new Random();
584        }
585
586        @Override
587        public void run() {
588          while (!walkersStop) {
589            try {
590              long node = selectLoop();
591              try {
592                walkLoop(node);
593              } catch (IOException e) {
594                context.getCounter(Counts.IOEXCEPTION).increment(1l);
595                return;
596              }
597            } catch (InterruptedException e) {
598              return;
599            }
600          }
601        }
602
603        private void walkLoop(long node) throws IOException {
604          walker = new ConcurrentWalker(context);
605          walker.setConf(conf);
606          walker.run(node, wrap);
607        }
608
609        private long selectLoop () throws InterruptedException{
610          synchronized (flushedLoops) {
611            while (flushedLoops.isEmpty() && !walkersStop) {
612              flushedLoops.wait();
613            }
614            if (walkersStop) {
615              throw new InterruptedException();
616            }
617            return flushedLoops.get(rand.nextInt(flushedLoops.size()));
618          }
619        }
620      }
621
622      public static class ConcurrentWalker extends WalkerBase {
623
624        Context context;
625
626        public ConcurrentWalker(Context context) {this.context = context;}
627
628        public void run(long startKeyIn, long maxQueriesIn) throws IOException {
629
630          long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
631          byte[] startKey = Bytes.toBytes(startKeyIn);
632
633          Connection connection = ConnectionFactory.createConnection(getConf());
634          Table table = connection.getTable(getTableName(getConf()));
635          long numQueries = 0;
636          // If isSpecificStart is set, only walk one list from that particular node.
637          // Note that in case of circular (or P-shaped) list it will walk forever, as is
638          // the case in normal run without startKey.
639
640          CINode node = findStartNode(table, startKey);
641          if (node == null) {
642            LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
643            throw new IOException("Start node not found: " + startKeyIn);
644          }
645          while (numQueries < maxQueries) {
646            numQueries++;
647            byte[] prev = node.prev;
648            long t1 = System.currentTimeMillis();
649            node = getNode(prev, table, node);
650            long t2 = System.currentTimeMillis();
651            if (node == null) {
652              LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
653              context.getCounter(Counts.UNDEFINED).increment(1l);
654            } else if (node.prev.length == NO_KEY.length) {
655              LOG.error("ConcurrentWalker found TERMINATING NODE: " +
656                  Bytes.toStringBinary(node.key));
657              context.getCounter(Counts.TERMINATING).increment(1l);
658            } else {
659              // Increment for successful walk
660              context.getCounter(Counts.SUCCESS).increment(1l);
661            }
662          }
663          table.close();
664          connection.close();
665        }
666      }
667    }
668
669    @Override
670    public int run(String[] args) throws Exception {
671      if (args.length < 3) {
672        System.err.println(USAGE);
673        return 1;
674      }
675      try {
676        int numMappers = Integer.parseInt(args[0]);
677        long numNodes = Long.parseLong(args[1]);
678        Path tmpOutput = new Path(args[2]);
679        Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
680        Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
681        Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]);
682        return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
683      } catch (NumberFormatException e) {
684        System.err.println("Parsing generator arguments failed: " + e.getMessage());
685        System.err.println(USAGE);
686        return 1;
687      }
688    }
689
690    protected void createSchema() throws IOException {
691      Configuration conf = getConf();
692      TableName tableName = getTableName(conf);
693      try (Connection conn = ConnectionFactory.createConnection(conf);
694          Admin admin = conn.getAdmin()) {
695        if (!admin.tableExists(tableName)) {
696          HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
697          htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
698          // Always add these families. Just skip writing to them when we do not test per CF flush.
699          htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME));
700          htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME));
701          // if -DuseMob=true force all data through mob path.
702          if (conf.getBoolean("useMob", false)) {
703            for (HColumnDescriptor hcd : htd.getColumnFamilies() ) {
704              hcd.setMobEnabled(true);
705              hcd.setMobThreshold(4);
706            }
707          }
708
709          // If we want to pre-split compute how many splits.
710          if (conf.getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY,
711              HBaseTestingUtility.PRESPLIT_TEST_TABLE)) {
712            int numberOfServers =
713                admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
714                    .getLiveServerMetrics().size();
715            if (numberOfServers == 0) {
716              throw new IllegalStateException("No live regionservers");
717            }
718            int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
719                HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
720            int totalNumberOfRegions = numberOfServers * regionsPerServer;
721            LOG.info("Number of live regionservers: " + numberOfServers + ", " +
722                "pre-splitting table into " + totalNumberOfRegions + " regions " +
723                "(default regions per server: " + regionsPerServer + ")");
724
725
726            byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
727
728            admin.createTable(htd, splits);
729          } else {
730            // Looks like we're just letting things play out.
731            // Create a table with on region by default.
732            // This will make the splitting work hard.
733            admin.createTable(htd);
734          }
735        }
736      } catch (MasterNotRunningException e) {
737        LOG.error("Master not running", e);
738        throw new IOException(e);
739      }
740    }
741
742    public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
743        Integer width, Integer wrapMultiplier, Integer numWalkers)
744        throws Exception {
745      LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
746          + ", numNodes=" + numNodes);
747      Job job = Job.getInstance(getConf());
748
749      job.setJobName("Random Input Generator");
750      job.setNumReduceTasks(0);
751      job.setJarByClass(getClass());
752
753      job.setInputFormatClass(GeneratorInputFormat.class);
754      job.setOutputKeyClass(BytesWritable.class);
755      job.setOutputValueClass(NullWritable.class);
756
757      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
758
759      job.setMapperClass(Mapper.class); //identity mapper
760
761      FileOutputFormat.setOutputPath(job, tmpOutput);
762      job.setOutputFormatClass(SequenceFileOutputFormat.class);
763
764      boolean success = jobCompletion(job);
765
766      return success ? 0 : 1;
767    }
768
769    public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
770        Integer width, Integer wrapMultiplier, Integer numWalkers)
771        throws Exception {
772      LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
773      createSchema();
774      job = Job.getInstance(getConf());
775
776      job.setJobName("Link Generator");
777      job.setNumReduceTasks(0);
778      job.setJarByClass(getClass());
779
780      FileInputFormat.setInputPaths(job, tmpOutput);
781      job.setInputFormatClass(OneFilePerMapperSFIF.class);
782      job.setOutputKeyClass(NullWritable.class);
783      job.setOutputValueClass(NullWritable.class);
784
785      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
786
787      setMapperForGenerator(job);
788
789      job.setOutputFormatClass(NullOutputFormat.class);
790
791      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
792      TableMapReduceUtil.addDependencyJars(job);
793      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
794                                                     AbstractHBaseTool.class);
795      TableMapReduceUtil.initCredentials(job);
796
797      boolean success = jobCompletion(job);
798
799      return success ? 0 : 1;
800    }
801
802    protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
803        ClassNotFoundException {
804      boolean success = job.waitForCompletion(true);
805      return success;
806    }
807
808    protected void setMapperForGenerator(Job job) {
809      job.setMapperClass(GeneratorMapper.class);
810    }
811
812    public int run(int numMappers, long numNodes, Path tmpOutput,
813        Integer width, Integer wrapMultiplier, Integer numWalkers)
814        throws Exception {
815      int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier,
816          numWalkers);
817      if (ret > 0) {
818        return ret;
819      }
820      return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
821    }
822
823    public boolean verify() {
824      try {
825        Counters counters = job.getCounters();
826        if (counters == null) {
827          LOG.info("Counters object was null, Generator verification cannot be performed."
828              + " This is commonly a result of insufficient YARN configuration.");
829          return false;
830        }
831
832        if (counters.findCounter(Counts.TERMINATING).getValue() > 0 ||
833            counters.findCounter(Counts.UNDEFINED).getValue() > 0 ||
834            counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) {
835          LOG.error("Concurrent walker failed to verify during Generation phase");
836          LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue());
837          LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue());
838          LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue());
839          return false;
840        }
841      } catch (IOException e) {
842        LOG.info("Generator verification could not find counter");
843        return false;
844      }
845      return true;
846    }
847  }
848
849  /**
850   * Tool to search missing rows in WALs and hfiles.
851   * Pass in file or dir of keys to search for. Key file must have been written by Verify step
852   * (we depend on the format it writes out. We'll read them in and then search in hbase
853   * WALs and oldWALs dirs (Some of this is TODO).
854   */
855  static class Search extends Configured implements Tool {
856    private static final Logger LOG = LoggerFactory.getLogger(Search.class);
857    protected Job job;
858
859    private static void printUsage(final String error) {
860      if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
861      System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
862    }
863
864    @Override
865    public int run(String[] args) throws Exception {
866      if (args.length < 1 || args.length > 2) {
867        printUsage(null);
868        return 1;
869      }
870      Path inputDir = new Path(args[0]);
871      int numMappers = 1;
872      if (args.length > 1) {
873        numMappers = Integer.parseInt(args[1]);
874      }
875      return run(inputDir, numMappers);
876    }
877
878    /**
879     * WALPlayer override that searches for keys loaded in the setup.
880     */
881    public static class WALSearcher extends WALPlayer {
882      public WALSearcher(Configuration conf) {
883        super(conf);
884      }
885
886      /**
887       * The actual searcher mapper.
888       */
889      public static class WALMapperSearcher extends WALMapper {
890        private SortedSet<byte []> keysToFind;
891        private AtomicInteger rows = new AtomicInteger(0);
892
893        @Override
894        public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
895            throws IOException {
896          super.setup(context);
897          try {
898            this.keysToFind = readKeysToSearch(context.getConfiguration());
899            LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
900          } catch (InterruptedException e) {
901            throw new InterruptedIOException(e.toString());
902          }
903        }
904
905        @Override
906        protected boolean filter(Context context, Cell cell) {
907          // TODO: Can I do a better compare than this copying out key?
908          byte [] row = new byte [cell.getRowLength()];
909          System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
910          boolean b = this.keysToFind.contains(row);
911          if (b) {
912            String keyStr = Bytes.toStringBinary(row);
913            try {
914              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
915            } catch (IOException|InterruptedException e) {
916              LOG.warn(e.toString(), e);
917            }
918            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
919              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
920            }
921            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
922          }
923          return b;
924        }
925      }
926
927      // Put in place the above WALMapperSearcher.
928      @Override
929      public Job createSubmittableJob(String[] args) throws IOException {
930        Job job = super.createSubmittableJob(args);
931        // Call my class instead.
932        job.setJarByClass(WALMapperSearcher.class);
933        job.setMapperClass(WALMapperSearcher.class);
934        job.setOutputFormatClass(NullOutputFormat.class);
935        return job;
936      }
937    }
938
939    static final String FOUND_GROUP_KEY = "Found";
940    static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
941
942    public int run(Path inputDir, int numMappers) throws Exception {
943      getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
944      SortedSet<byte []> keys = readKeysToSearch(getConf());
945      if (keys.isEmpty()) throw new RuntimeException("No keys to find");
946      LOG.info("Count of keys to find: " + keys.size());
947      for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
948      // Now read all WALs. In two dirs. Presumes certain layout.
949      Path walsDir = new Path(
950          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
951      Path oldWalsDir = new Path(
952          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
953      LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
954        " against " + getConf().get(HConstants.HBASE_DIR));
955      int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
956          new String [] {walsDir.toString(), ""});
957      if (ret != 0) {
958        return ret;
959      }
960      return ToolRunner.run(getConf(), new WALSearcher(getConf()),
961          new String [] {oldWalsDir.toString(), ""});
962    }
963
964    static SortedSet<byte []> readKeysToSearch(final Configuration conf)
965    throws IOException, InterruptedException {
966      Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
967      FileSystem fs = FileSystem.get(conf);
968      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
969      if (!fs.exists(keysInputDir)) {
970        throw new FileNotFoundException(keysInputDir.toString());
971      }
972      if (!fs.isDirectory(keysInputDir)) {
973        throw new UnsupportedOperationException("TODO");
974      } else {
975        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
976        while(iterator.hasNext()) {
977          LocatedFileStatus keyFileStatus = iterator.next();
978          // Skip "_SUCCESS" file.
979          if (keyFileStatus.getPath().getName().startsWith("_")) continue;
980          result.addAll(readFileToSearch(conf, fs, keyFileStatus));
981        }
982      }
983      return result;
984    }
985
986    private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
987        final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
988        InterruptedException {
989      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
990      // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
991      // what is missing.
992      TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
993      try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
994          new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
995        InputSplit is =
996          new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
997        rr.initialize(is, context);
998        while (rr.nextKeyValue()) {
999          rr.getCurrentKey();
1000          BytesWritable bw = rr.getCurrentValue();
1001          if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
1002            byte[] key = new byte[rr.getCurrentKey().getLength()];
1003            System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
1004                .getLength());
1005            result.add(key);
1006          }
1007        }
1008      }
1009      return result;
1010    }
1011  }
1012
1013  /**
1014   * A Map Reduce job that verifies that the linked lists generated by
1015   * {@link Generator} do not have any holes.
1016   */
1017  static class Verify extends Configured implements Tool {
1018
1019    private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
1020    protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
1021    protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
1022
1023    protected Job job;
1024
1025    public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
1026      private BytesWritable row = new BytesWritable();
1027      private BytesWritable ref = new BytesWritable();
1028
1029      private boolean multipleUnevenColumnFamilies;
1030
1031      @Override
1032      protected void setup(
1033          Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
1034          throws IOException, InterruptedException {
1035        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
1036      }
1037
1038      @Override
1039      protected void map(ImmutableBytesWritable key, Result value, Context context)
1040          throws IOException ,InterruptedException {
1041        byte[] rowKey = key.get();
1042        row.set(rowKey, 0, rowKey.length);
1043        if (multipleUnevenColumnFamilies
1044            && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn(
1045              TINY_FAMILY_NAME, TINY_FAMILY_NAME))) {
1046          context.write(row, DEF_LOST_FAMILIES);
1047        } else {
1048          context.write(row, DEF);
1049        }
1050        byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
1051        if (prev != null && prev.length > 0) {
1052          ref.set(prev, 0, prev.length);
1053          context.write(ref, row);
1054        } else {
1055          LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
1056        }
1057      }
1058    }
1059
1060    /**
1061     * Don't change the order of these enums. Their ordinals are used as type flag when we emit
1062     * problems found from the reducer.
1063     */
1064    public static enum Counts {
1065      UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES,
1066      LOST_FAMILIES
1067    }
1068
1069    /**
1070     * Per reducer, we output problem rows as byte arrasy so can be used as input for
1071     * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag
1072     * saying what sort of emission it is. Flag is the Count enum ordinal as a short.
1073     */
1074    public static class VerifyReducer extends
1075        Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
1076      private ArrayList<byte[]> refs = new ArrayList<>();
1077      private final BytesWritable UNREF = new BytesWritable(addPrefixFlag(
1078        Counts.UNREFERENCED.ordinal(), new byte[] {}));
1079      private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag(
1080        Counts.LOST_FAMILIES.ordinal(), new byte[] {}));
1081
1082      private AtomicInteger rows = new AtomicInteger(0);
1083      private Connection connection;
1084
1085      @Override
1086      protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1087      throws IOException, InterruptedException {
1088        super.setup(context);
1089        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
1090      }
1091
1092      @Override
1093      protected void cleanup(
1094          Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1095          throws IOException, InterruptedException {
1096        if (this.connection != null) {
1097          this.connection.close();
1098        }
1099        super.cleanup(context);
1100      }
1101
1102      /**
1103       * @param ordinal
1104       * @param r
1105       * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up
1106       * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
1107       */
1108      public static byte[] addPrefixFlag(final int ordinal, final byte [] r) {
1109        byte[] prefix = Bytes.toBytes((short)ordinal);
1110        if (prefix.length != Bytes.SIZEOF_SHORT) {
1111          throw new RuntimeException("Unexpected size: " + prefix.length);
1112        }
1113        byte[] result = new byte[prefix.length + r.length];
1114        System.arraycopy(prefix, 0, result, 0, prefix.length);
1115        System.arraycopy(r, 0, result, prefix.length, r.length);
1116        return result;
1117      }
1118
1119      /**
1120       * @param bs
1121       * @return Type from the Counts enum of this row. Reads prefix added by
1122       * {@link #addPrefixFlag(int, byte[])}
1123       */
1124      public static Counts whichType(final byte [] bs) {
1125        int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
1126        return Counts.values()[ordinal];
1127      }
1128
1129      /**
1130       * @param bw
1131       * @return Row bytes minus the type flag.
1132       */
1133      public static byte[] getRowOnly(BytesWritable bw) {
1134        byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
1135        System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
1136        return bytes;
1137      }
1138
1139      @Override
1140      public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
1141          throws IOException, InterruptedException {
1142        int defCount = 0;
1143        boolean lostFamilies = false;
1144        refs.clear();
1145        for (BytesWritable type : values) {
1146          if (type.getLength() == DEF.getLength()) {
1147            defCount++;
1148            if (type.getBytes()[0] == 1) {
1149              lostFamilies = true;
1150            }
1151          } else {
1152            byte[] bytes = new byte[type.getLength()];
1153            System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
1154            refs.add(bytes);
1155          }
1156        }
1157
1158        // TODO check for more than one def, should not happen
1159        StringBuilder refsSb = null;
1160        String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1161        if (defCount == 0 || refs.size() != 1) {
1162          refsSb = dumpExtraInfoOnRefs(key, context, refs);
1163          LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
1164            (refsSb != null? refsSb.toString(): ""));
1165        }
1166        if (lostFamilies) {
1167          LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
1168          context.getCounter(Counts.LOST_FAMILIES).increment(1);
1169          context.write(key, LOSTFAM);
1170        }
1171
1172        if (defCount == 0 && refs.size() > 0) {
1173          // This is bad, found a node that is referenced but not defined. It must have been
1174          // lost, emit some info about this node for debugging purposes.
1175          // Write out a line per reference. If more than one, flag it.;
1176          for (int i = 0; i < refs.size(); i++) {
1177            byte[] bs = refs.get(i);
1178            int ordinal;
1179            if (i <= 0) {
1180              ordinal = Counts.UNDEFINED.ordinal();
1181              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1182              context.getCounter(Counts.UNDEFINED).increment(1);
1183            } else {
1184              ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
1185              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1186            }
1187          }
1188          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1189            // Print out missing row; doing get on reference gives info on when the referencer
1190            // was added which can help a little debugging. This info is only available in mapper
1191            // output -- the 'Linked List error Key...' log message above. What we emit here is
1192            // useless for debugging.
1193            context.getCounter("undef", keyString).increment(1);
1194          }
1195        } else if (defCount > 0 && refs.isEmpty()) {
1196          // node is defined but not referenced
1197          context.write(key, UNREF);
1198          context.getCounter(Counts.UNREFERENCED).increment(1);
1199          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1200            context.getCounter("unref", keyString).increment(1);
1201          }
1202        } else {
1203          if (refs.size() > 1) {
1204            // Skip first reference.
1205            for (int i = 1; i < refs.size(); i++) {
1206              context.write(key,
1207                new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
1208            }
1209            context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
1210          }
1211          // node is defined and referenced
1212          context.getCounter(Counts.REFERENCED).increment(1);
1213        }
1214      }
1215
1216      /**
1217       * Dump out extra info around references if there are any. Helps debugging.
1218       * @return StringBuilder filled with references if any.
1219       * @throws IOException
1220       */
1221      private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1222          final List<byte []> refs)
1223      throws IOException {
1224        StringBuilder refsSb = null;
1225        if (refs.isEmpty()) return refsSb;
1226        refsSb = new StringBuilder();
1227        String comma = "";
1228        // If a row is a reference but has no define, print the content of the row that has
1229        // this row as a 'prev'; it will help debug.  The missing row was written just before
1230        // the row we are dumping out here.
1231        TableName tn = getTableName(context.getConfiguration());
1232        try (Table t = this.connection.getTable(tn)) {
1233          for (byte [] ref : refs) {
1234            Result r = t.get(new Get(ref));
1235            List<Cell> cells = r.listCells();
1236            String ts = (cells != null && !cells.isEmpty())?
1237                new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
1238            byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1239            String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
1240            b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1241            long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
1242            b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1243            String refRegionLocation = "";
1244            String keyRegionLocation = "";
1245            if (b != null && b.length > 0) {
1246              try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1247                HRegionLocation hrl = rl.getRegionLocation(b);
1248                if (hrl != null) refRegionLocation = hrl.toString();
1249                // Key here probably has trailing zeros on it.
1250                hrl = rl.getRegionLocation(key.getBytes());
1251                if (hrl != null) keyRegionLocation = hrl.toString();
1252              }
1253            }
1254            LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) +
1255              ", refPrevEqualsKey=" +
1256                (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) +
1257                ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) +
1258                ", ref row date=" + ts + ", jobStr=" + jobStr +
1259                ", ref row count=" + count +
1260                ", ref row regionLocation=" + refRegionLocation +
1261                ", key row regionLocation=" + keyRegionLocation);
1262            refsSb.append(comma);
1263            comma = ",";
1264            refsSb.append(Bytes.toStringBinary(ref));
1265          }
1266        }
1267        return refsSb;
1268      }
1269    }
1270
1271    @Override
1272    public int run(String[] args) throws Exception {
1273      if (args.length != 2) {
1274        System.out.println("Usage : " + Verify.class.getSimpleName()
1275            + " <output dir> <num reducers>");
1276        return 0;
1277      }
1278
1279      String outputDir = args[0];
1280      int numReducers = Integer.parseInt(args[1]);
1281
1282      return run(outputDir, numReducers);
1283    }
1284
1285    public int run(String outputDir, int numReducers) throws Exception {
1286      return run(new Path(outputDir), numReducers);
1287    }
1288
1289    public int run(Path outputDir, int numReducers) throws Exception {
1290      LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
1291
1292      job = Job.getInstance(getConf());
1293
1294      job.setJobName("Link Verifier");
1295      job.setNumReduceTasks(numReducers);
1296      job.setJarByClass(getClass());
1297
1298      setJobScannerConf(job);
1299
1300      Scan scan = new Scan();
1301      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1302      scan.setCaching(10000);
1303      scan.setCacheBlocks(false);
1304      if (isMultiUnevenColumnFamilies(getConf())) {
1305        scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1306        scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1307      }
1308
1309      TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1310          VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1311      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
1312                                                     AbstractHBaseTool.class);
1313
1314      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1315
1316      job.setReducerClass(VerifyReducer.class);
1317      job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1318      job.setOutputKeyClass(BytesWritable.class);
1319      job.setOutputValueClass(BytesWritable.class);
1320      TextOutputFormat.setOutputPath(job, outputDir);
1321
1322      boolean success = job.waitForCompletion(true);
1323
1324      if (success) {
1325        Counters counters = job.getCounters();
1326        if (null == counters) {
1327          LOG.warn("Counters were null, cannot verify Job completion."
1328              + " This is commonly a result of insufficient YARN configuration.");
1329          // We don't have access to the counters to know if we have "bad" counts
1330          return 0;
1331        }
1332
1333        // If we find no unexpected values, the job didn't outright fail
1334        if (verifyUnexpectedValues(counters)) {
1335          // We didn't check referenced+unreferenced counts, leave that to visual inspection
1336          return 0;
1337        }
1338      }
1339
1340      // We failed
1341      return 1;
1342    }
1343
1344    public boolean verify(long expectedReferenced) throws Exception {
1345      if (job == null) {
1346        throw new IllegalStateException("You should call run() first");
1347      }
1348
1349      Counters counters = job.getCounters();
1350      if (counters == null) {
1351        LOG.info("Counters object was null, write verification cannot be performed."
1352              + " This is commonly a result of insufficient YARN configuration.");
1353        return false;
1354      }
1355
1356      // Run through each check, even if we fail one early
1357      boolean success = verifyExpectedValues(expectedReferenced, counters);
1358
1359      if (!verifyUnexpectedValues(counters)) {
1360        // We found counter objects which imply failure
1361        success = false;
1362      }
1363
1364      if (!success) {
1365        handleFailure(counters);
1366      }
1367      return success;
1368    }
1369
1370    /**
1371     * Verify the values in the Counters against the expected number of entries written.
1372     *
1373     * @param expectedReferenced
1374     *          Expected number of referenced entrires
1375     * @param counters
1376     *          The Job's Counters object
1377     * @return True if the values match what's expected, false otherwise
1378     */
1379    protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1380      final Counter referenced = counters.findCounter(Counts.REFERENCED);
1381      final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
1382      boolean success = true;
1383
1384      if (expectedReferenced != referenced.getValue()) {
1385        LOG.error("Expected referenced count does not match with actual referenced count. " +
1386            "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1387        success = false;
1388      }
1389
1390      if (unreferenced.getValue() > 0) {
1391        final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
1392        boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1393        LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1394            + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1395        success = false;
1396      }
1397
1398      return success;
1399    }
1400
1401    /**
1402     * Verify that the Counters don't contain values which indicate an outright failure from the Reducers.
1403     *
1404     * @param counters
1405     *          The Job's counters
1406     * @return True if the "bad" counter objects are 0, false otherwise
1407     */
1408    protected boolean verifyUnexpectedValues(Counters counters) {
1409      final Counter undefined = counters.findCounter(Counts.UNDEFINED);
1410      final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
1411      boolean success = true;
1412
1413      if (undefined.getValue() > 0) {
1414        LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1415        success = false;
1416      }
1417
1418      if (lostfamilies.getValue() > 0) {
1419        LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1420        success = false;
1421      }
1422
1423      return success;
1424    }
1425
1426    protected void handleFailure(Counters counters) throws IOException {
1427      Configuration conf = job.getConfiguration();
1428      TableName tableName = getTableName(conf);
1429      try (Connection conn = ConnectionFactory.createConnection(conf)) {
1430        try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1431          CounterGroup g = counters.getGroup("undef");
1432          Iterator<Counter> it = g.iterator();
1433          while (it.hasNext()) {
1434            String keyString = it.next().getName();
1435            byte[] key = Bytes.toBytes(keyString);
1436            HRegionLocation loc = rl.getRegionLocation(key, true);
1437            LOG.error("undefined row " + keyString + ", " + loc);
1438          }
1439          g = counters.getGroup("unref");
1440          it = g.iterator();
1441          while (it.hasNext()) {
1442            String keyString = it.next().getName();
1443            byte[] key = Bytes.toBytes(keyString);
1444            HRegionLocation loc = rl.getRegionLocation(key, true);
1445            LOG.error("unreferred row " + keyString + ", " + loc);
1446          }
1447        }
1448      }
1449    }
1450  }
1451
1452  /**
1453   * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
1454   * adds more data.
1455   */
1456  static class Loop extends Configured implements Tool {
1457
1458    private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
1459    private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " +
1460        "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" +
1461        " <num walker threads>] \n" +
1462        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
1463        "walkers will select and verify random flushed loop during Generation.";
1464
1465    IntegrationTestBigLinkedList it;
1466
1467    protected void runGenerator(int numMappers, long numNodes,
1468        String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers)
1469        throws Exception {
1470      Path outputPath = new Path(outputDir);
1471      UUID uuid = UUID.randomUUID(); //create a random UUID.
1472      Path generatorOutput = new Path(outputPath, uuid.toString());
1473
1474      Generator generator = new Generator();
1475      generator.setConf(getConf());
1476      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
1477          numWalkers);
1478      if (retCode > 0) {
1479        throw new RuntimeException("Generator failed with return code: " + retCode);
1480      }
1481      if (numWalkers > 0) {
1482        if (!generator.verify()) {
1483          throw new RuntimeException("Generator.verify failed");
1484        }
1485      }
1486    }
1487
1488    protected void runVerify(String outputDir,
1489        int numReducers, long expectedNumNodes) throws Exception {
1490      Path outputPath = new Path(outputDir);
1491      UUID uuid = UUID.randomUUID(); //create a random UUID.
1492      Path iterationOutput = new Path(outputPath, uuid.toString());
1493
1494      Verify verify = new Verify();
1495      verify.setConf(getConf());
1496      int retCode = verify.run(iterationOutput, numReducers);
1497      if (retCode > 0) {
1498        throw new RuntimeException("Verify.run failed with return code: " + retCode);
1499      }
1500
1501      if (!verify.verify(expectedNumNodes)) {
1502        throw new RuntimeException("Verify.verify failed");
1503      }
1504      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
1505    }
1506
1507    @Override
1508    public int run(String[] args) throws Exception {
1509      if (args.length < 5) {
1510        System.err.println(USAGE);
1511        return 1;
1512      }
1513      try {
1514        int numIterations = Integer.parseInt(args[0]);
1515        int numMappers = Integer.parseInt(args[1]);
1516        long numNodes = Long.parseLong(args[2]);
1517        String outputDir = args[3];
1518        int numReducers = Integer.parseInt(args[4]);
1519        Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1520        Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1521        Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]);
1522
1523        long expectedNumNodes = 0;
1524
1525        if (numIterations < 0) {
1526          numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
1527        }
1528        LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1529        for (int i = 0; i < numIterations; i++) {
1530          LOG.info("Starting iteration = " + i);
1531          runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers);
1532          expectedNumNodes += numMappers * numNodes;
1533          runVerify(outputDir, numReducers, expectedNumNodes);
1534        }
1535        return 0;
1536      } catch (NumberFormatException e) {
1537        System.err.println("Parsing loop arguments failed: " + e.getMessage());
1538        System.err.println(USAGE);
1539        return 1;
1540      }
1541    }
1542  }
1543
1544  /**
1545   * A stand alone program that prints out portions of a list created by {@link Generator}
1546   */
1547  private static class Print extends Configured implements Tool {
1548    @Override
1549    public int run(String[] args) throws Exception {
1550      Options options = new Options();
1551      options.addOption("s", "start", true, "start key");
1552      options.addOption("e", "end", true, "end key");
1553      options.addOption("l", "limit", true, "number to print");
1554
1555      GnuParser parser = new GnuParser();
1556      CommandLine cmd = null;
1557      try {
1558        cmd = parser.parse(options, args);
1559        if (cmd.getArgs().length != 0) {
1560          throw new ParseException("Command takes no arguments");
1561        }
1562      } catch (ParseException e) {
1563        System.err.println("Failed to parse command line " + e.getMessage());
1564        System.err.println();
1565        HelpFormatter formatter = new HelpFormatter();
1566        formatter.printHelp(getClass().getSimpleName(), options);
1567        System.exit(-1);
1568      }
1569
1570      Connection connection = ConnectionFactory.createConnection(getConf());
1571      Table table = connection.getTable(getTableName(getConf()));
1572
1573      Scan scan = new Scan();
1574      scan.setBatch(10000);
1575
1576      if (cmd.hasOption("s"))
1577        scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1578
1579      if (cmd.hasOption("e"))
1580        scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1581
1582      int limit = 0;
1583      if (cmd.hasOption("l"))
1584        limit = Integer.parseInt(cmd.getOptionValue("l"));
1585      else
1586        limit = 100;
1587
1588      ResultScanner scanner = table.getScanner(scan);
1589
1590      CINode node = new CINode();
1591      Result result = scanner.next();
1592      int count = 0;
1593      while (result != null && count++ < limit) {
1594        node = getCINode(result, node);
1595        System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1596            Bytes.toStringBinary(node.prev), node.count, node.client);
1597        result = scanner.next();
1598      }
1599      scanner.close();
1600      table.close();
1601      connection.close();
1602
1603      return 0;
1604    }
1605  }
1606
1607  /**
1608   * A stand alone program that deletes a single node.
1609   */
1610  private static class Delete extends Configured implements Tool {
1611    @Override
1612    public int run(String[] args) throws Exception {
1613      if (args.length != 1) {
1614        System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1615        return 0;
1616      }
1617      byte[] val = Bytes.toBytesBinary(args[0]);
1618
1619      org.apache.hadoop.hbase.client.Delete delete
1620        = new org.apache.hadoop.hbase.client.Delete(val);
1621
1622      try (Connection connection = ConnectionFactory.createConnection(getConf());
1623          Table table = connection.getTable(getTableName(getConf()))) {
1624        table.delete(delete);
1625      }
1626
1627      System.out.println("Delete successful");
1628      return 0;
1629    }
1630  }
1631
1632  abstract static class WalkerBase extends Configured{
1633    protected static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1634      Scan scan = new Scan();
1635      scan.setStartRow(startKey);
1636      scan.setBatch(1);
1637      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1638
1639      long t1 = System.currentTimeMillis();
1640      ResultScanner scanner = table.getScanner(scan);
1641      Result result = scanner.next();
1642      long t2 = System.currentTimeMillis();
1643      scanner.close();
1644
1645      if ( result != null) {
1646        CINode node = getCINode(result, new CINode());
1647        System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1648        return node;
1649      }
1650
1651      System.out.println("FSR " + (t2 - t1));
1652
1653      return null;
1654    }
1655    protected CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1656      Get get = new Get(row);
1657      get.addColumn(FAMILY_NAME, COLUMN_PREV);
1658      Result result = table.get(get);
1659      return getCINode(result, node);
1660    }
1661  }
1662  /**
1663   * A stand alone program that follows a linked list created by {@link Generator} and prints
1664   * timing info.
1665   */
1666  private static class Walker extends WalkerBase implements Tool {
1667
1668    public Walker(){}
1669
1670    @Override
1671    public int run(String[] args) throws IOException {
1672
1673      Options options = new Options();
1674      options.addOption("n", "num", true, "number of queries");
1675      options.addOption("s", "start", true, "key to start at, binary string");
1676      options.addOption("l", "logevery", true, "log every N queries");
1677
1678      GnuParser parser = new GnuParser();
1679      CommandLine cmd = null;
1680      try {
1681        cmd = parser.parse(options, args);
1682        if (cmd.getArgs().length != 0) {
1683          throw new ParseException("Command takes no arguments");
1684        }
1685      } catch (ParseException e) {
1686        System.err.println("Failed to parse command line " + e.getMessage());
1687        System.err.println();
1688        HelpFormatter formatter = new HelpFormatter();
1689        formatter.printHelp(getClass().getSimpleName(), options);
1690        System.exit(-1);
1691      }
1692
1693      long maxQueries = Long.MAX_VALUE;
1694      if (cmd.hasOption('n')) {
1695        maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1696      }
1697      Random rand = new SecureRandom();
1698      boolean isSpecificStart = cmd.hasOption('s');
1699
1700      byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1701      int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1702
1703      Connection connection = ConnectionFactory.createConnection(getConf());
1704      Table table = connection.getTable(getTableName(getConf()));
1705      long numQueries = 0;
1706      // If isSpecificStart is set, only walk one list from that particular node.
1707      // Note that in case of circular (or P-shaped) list it will walk forever, as is
1708      // the case in normal run without startKey.
1709      while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1710        if (!isSpecificStart) {
1711          startKey = new byte[ROWKEY_LENGTH];
1712          rand.nextBytes(startKey);
1713        }
1714        CINode node = findStartNode(table, startKey);
1715        if (node == null && isSpecificStart) {
1716          System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1717        }
1718        numQueries++;
1719        while (node != null && node.prev.length != NO_KEY.length &&
1720            numQueries < maxQueries) {
1721          byte[] prev = node.prev;
1722          long t1 = System.currentTimeMillis();
1723          node = getNode(prev, table, node);
1724          long t2 = System.currentTimeMillis();
1725          if (logEvery > 0 && numQueries % logEvery == 0) {
1726            System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1727          }
1728          numQueries++;
1729          if (node == null) {
1730            System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1731          } else if (node.prev.length == NO_KEY.length) {
1732            System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1733          }
1734        }
1735      }
1736      table.close();
1737      connection.close();
1738      return 0;
1739    }
1740  }
1741
1742  private static class Clean extends Configured implements Tool {
1743    @Override public int run(String[] args) throws Exception {
1744      if (args.length < 1) {
1745        System.err.println("Usage: Clean <output dir>");
1746        return -1;
1747      }
1748
1749      Path p = new Path(args[0]);
1750      Configuration conf = getConf();
1751      TableName tableName = getTableName(conf);
1752      try (FileSystem fs = HFileSystem.get(conf);
1753          Connection conn = ConnectionFactory.createConnection(conf);
1754          Admin admin = conn.getAdmin()) {
1755        if (admin.tableExists(tableName)) {
1756          admin.disableTable(tableName);
1757          admin.deleteTable(tableName);
1758        }
1759
1760        if (fs.exists(p)) {
1761          fs.delete(p, true);
1762        }
1763      }
1764
1765      return 0;
1766    }
1767  }
1768
1769  static TableName getTableName(Configuration conf) {
1770    return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1771  }
1772
1773  private static CINode getCINode(Result result, CINode node) {
1774    node.key = Bytes.copy(result.getRow());
1775    if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1776      node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1777    } else {
1778      node.prev = NO_KEY;
1779    }
1780    if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1781      node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1782    } else {
1783      node.count = -1;
1784    }
1785    if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1786      node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1787    } else {
1788      node.client = "";
1789    }
1790    return node;
1791  }
1792
1793  protected IntegrationTestingUtility util;
1794
1795  @Override
1796  public void setUpCluster() throws Exception {
1797    util = getTestingUtil(getConf());
1798    boolean isDistributed = util.isDistributedCluster();
1799    util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1800    if (!isDistributed) {
1801      util.startMiniMapReduceCluster();
1802    }
1803    this.setConf(util.getConfiguration());
1804  }
1805
1806  @Override
1807  public void cleanUpCluster() throws Exception {
1808    super.cleanUpCluster();
1809    if (util.isDistributedCluster()) {
1810      util.shutdownMiniMapReduceCluster();
1811    }
1812  }
1813
1814  private static boolean isMultiUnevenColumnFamilies(Configuration conf) {
1815    return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,true);
1816  }
1817
1818  @Test
1819  public void testContinuousIngest() throws IOException, Exception {
1820    //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1821    Configuration conf = getTestingUtil(getConf()).getConfiguration();
1822    if (isMultiUnevenColumnFamilies(getConf())) {
1823      // make sure per CF flush is on
1824      conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
1825    }
1826    int ret =
1827        ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1828            util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1829    org.junit.Assert.assertEquals(0, ret);
1830  }
1831
1832  private void usage() {
1833    System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1834    printCommands();
1835  }
1836
1837  private void printCommands() {
1838    System.err.println("Commands:");
1839    System.err.println(" generator  Map only job that generates data.");
1840    System.err.println(" verify     A map reduce job that looks for holes. Check return code and");
1841    System.err.println("            look at the counts after running. See REFERENCED and");
1842    System.err.println("            UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1843    System.err.println("            with the Generator.");
1844    System.err.println(" walker     " +
1845      "Standalone program that starts following a linked list & emits timing info.");
1846    System.err.println(" print      Standalone program that prints nodes in the linked list.");
1847    System.err.println(" delete     Standalone program that deletes a·single node.");
1848    System.err.println(" loop       Program to Loop through Generator and Verify steps");
1849    System.err.println(" clean      Program to clean all left over detritus.");
1850    System.err.println(" search     Search for missing keys.");
1851    System.err.println("");
1852    System.err.println("General options:");
1853    System.err.println(" -D"+ TABLE_NAME_KEY+ "=<tableName>");
1854    System.err.println("    Run using the <tableName> as the tablename.  Defaults to "
1855        + DEFAULT_TABLE_NAME);
1856    System.err.println(" -D"+ HBaseTestingUtility.REGIONS_PER_SERVER_KEY+ "=<# regions>");
1857    System.err.println("    Create table with presplit regions per server.  Defaults to "
1858        + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
1859
1860    System.err.println(" -DuseMob=<true|false>");
1861    System.err.println("    Create table so that the mob read/write path is forced.  " +
1862        "Defaults to false");
1863
1864    System.err.flush();
1865  }
1866
1867  @Override
1868  protected void processOptions(CommandLine cmd) {
1869    super.processOptions(cmd);
1870    String[] args = cmd.getArgs();
1871    //get the class, run with the conf
1872    if (args.length < 1) {
1873      printUsage(this.getClass().getSimpleName() +
1874        " <general options> COMMAND [<COMMAND options>]", "General options:", "");
1875      printCommands();
1876      // Have to throw an exception here to stop the processing. Looks ugly but gets message across.
1877      throw new RuntimeException("Incorrect Number of args.");
1878    }
1879    toRun = args[0];
1880    otherArgs = Arrays.copyOfRange(args, 1, args.length);
1881  }
1882
1883  @Override
1884  public int runTestFromCommandLine() throws Exception {
1885    Tool tool = null;
1886    if (toRun.equalsIgnoreCase("Generator")) {
1887      tool = new Generator();
1888    } else if (toRun.equalsIgnoreCase("Verify")) {
1889      tool = new Verify();
1890    } else if (toRun.equalsIgnoreCase("Loop")) {
1891      Loop loop = new Loop();
1892      loop.it = this;
1893      tool = loop;
1894    } else if (toRun.equalsIgnoreCase("Walker")) {
1895      tool = new Walker();
1896    } else if (toRun.equalsIgnoreCase("Print")) {
1897      tool = new Print();
1898    } else if (toRun.equalsIgnoreCase("Delete")) {
1899      tool = new Delete();
1900    } else if (toRun.equalsIgnoreCase("Clean")) {
1901      tool = new Clean();
1902    } else if (toRun.equalsIgnoreCase("Search")) {
1903      tool = new Search();
1904    } else {
1905      usage();
1906      throw new RuntimeException("Unknown arg");
1907    }
1908
1909    return ToolRunner.run(getConf(), tool, otherArgs);
1910  }
1911
1912  @Override
1913  public TableName getTablename() {
1914    Configuration c = getConf();
1915    return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1916  }
1917
1918  @Override
1919  protected Set<String> getColumnFamilies() {
1920    if (isMultiUnevenColumnFamilies(getConf())) {
1921      return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
1922        Bytes.toString(TINY_FAMILY_NAME));
1923    } else {
1924      return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1925    }
1926  }
1927
1928  private static void setJobConf(Job job, int numMappers, long numNodes,
1929      Integer width, Integer wrapMultiplier, Integer numWalkers) {
1930    job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1931    job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1932    if (width != null) {
1933      job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1934    }
1935    if (wrapMultiplier != null) {
1936      job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1937    }
1938    if (numWalkers != null) {
1939      job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers);
1940    }
1941  }
1942
1943  public static void setJobScannerConf(Job job) {
1944    // Make sure scanners log something useful to make debugging possible.
1945    job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
1946    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1947  }
1948
1949  public static void main(String[] args) throws Exception {
1950    Configuration conf = HBaseConfiguration.create();
1951    IntegrationTestingUtility.setUseDistributedCluster(conf);
1952    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1953    System.exit(ret);
1954  }
1955}