001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.security.SecureRandom;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.EnumSet;
029import java.util.Iterator;
030import java.util.List;
031import java.util.Random;
032import java.util.Set;
033import java.util.SortedSet;
034import java.util.TreeSet;
035import java.util.UUID;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.atomic.AtomicInteger;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.conf.Configured;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.LocatedFileStatus;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.RemoteIterator;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.ClusterMetrics.Option;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HBaseTestingUtility;
048import org.apache.hadoop.hbase.HColumnDescriptor;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.HRegionLocation;
051import org.apache.hadoop.hbase.HTableDescriptor;
052import org.apache.hadoop.hbase.IntegrationTestBase;
053import org.apache.hadoop.hbase.IntegrationTestingUtility;
054import org.apache.hadoop.hbase.MasterNotRunningException;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.client.Admin;
057import org.apache.hadoop.hbase.client.BufferedMutator;
058import org.apache.hadoop.hbase.client.BufferedMutatorParams;
059import org.apache.hadoop.hbase.client.Connection;
060import org.apache.hadoop.hbase.client.ConnectionConfiguration;
061import org.apache.hadoop.hbase.client.ConnectionFactory;
062import org.apache.hadoop.hbase.client.Get;
063import org.apache.hadoop.hbase.client.Mutation;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.RegionLocator;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.client.ResultScanner;
068import org.apache.hadoop.hbase.client.Scan;
069import org.apache.hadoop.hbase.client.ScannerCallable;
070import org.apache.hadoop.hbase.client.Table;
071import org.apache.hadoop.hbase.fs.HFileSystem;
072import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
073import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
074import org.apache.hadoop.hbase.mapreduce.TableMapper;
075import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
076import org.apache.hadoop.hbase.mapreduce.WALPlayer;
077import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy;
078import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
079import org.apache.hadoop.hbase.testclassification.IntegrationTests;
080import org.apache.hadoop.hbase.util.AbstractHBaseTool;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.hbase.util.CommonFSUtils;
083import org.apache.hadoop.hbase.util.Random64;
084import org.apache.hadoop.hbase.util.RegionSplitter;
085import org.apache.hadoop.hbase.wal.WALEdit;
086import org.apache.hadoop.hbase.wal.WALKey;
087import org.apache.hadoop.io.BytesWritable;
088import org.apache.hadoop.io.NullWritable;
089import org.apache.hadoop.io.Writable;
090import org.apache.hadoop.mapreduce.Counter;
091import org.apache.hadoop.mapreduce.CounterGroup;
092import org.apache.hadoop.mapreduce.Counters;
093import org.apache.hadoop.mapreduce.InputFormat;
094import org.apache.hadoop.mapreduce.InputSplit;
095import org.apache.hadoop.mapreduce.Job;
096import org.apache.hadoop.mapreduce.JobContext;
097import org.apache.hadoop.mapreduce.Mapper;
098import org.apache.hadoop.mapreduce.RecordReader;
099import org.apache.hadoop.mapreduce.Reducer;
100import org.apache.hadoop.mapreduce.TaskAttemptContext;
101import org.apache.hadoop.mapreduce.TaskAttemptID;
102import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
103import org.apache.hadoop.mapreduce.lib.input.FileSplit;
104import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
105import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
106import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
107import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
108import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
109import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
110import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
111import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
112import org.apache.hadoop.util.Tool;
113import org.apache.hadoop.util.ToolRunner;
114import org.junit.Test;
115import org.junit.experimental.categories.Category;
116import org.slf4j.Logger;
117import org.slf4j.LoggerFactory;
118
119import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
120import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
121import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
122import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
123import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
124import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
125import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
126
127/**
128 * This is an integration test borrowed from goraci, written by Keith Turner,
129 * which is in turn inspired by the Accumulo test called continous ingest (ci).
130 * The original source code can be found here:
131 * https://github.com/keith-turner/goraci
132 * https://github.com/enis/goraci/
133 *
134 * Apache Accumulo [0] has a simple test suite that verifies that data is not
135 * lost at scale. This test suite is called continuous ingest. This test runs
136 * many ingest clients that continually create linked lists containing 25
137 * million nodes. At some point the clients are stopped and a map reduce job is
138 * run to ensure no linked list has a hole. A hole indicates data was lost.··
139 *
140 * The nodes in the linked list are random. This causes each linked list to
141 * spread across the table. Therefore if one part of a table loses data, then it
142 * will be detected by references in another part of the table.
143 *
144 * THE ANATOMY OF THE TEST
145 *
146 * Below is rough sketch of how data is written. For specific details look at
147 * the Generator code.
148 *
149 * 1 Write out 1 million nodes· 2 Flush the client· 3 Write out 1 million that
150 * reference previous million· 4 If this is the 25th set of 1 million nodes,
151 * then update 1st set of million to point to last· 5 goto 1
152 *
153 * The key is that nodes only reference flushed nodes. Therefore a node should
154 * never reference a missing node, even if the ingest client is killed at any
155 * point in time.
156 *
157 * When running this test suite w/ Accumulo there is a script running in
158 * parallel called the Aggitator that randomly and continuously kills server
159 * processes.·· The outcome was that many data loss bugs were found in Accumulo
160 * by doing this.· This test suite can also help find bugs that impact uptime
161 * and stability when· run for days or weeks.··
162 *
163 * This test suite consists the following· - a few Java programs· - a little
164 * helper script to run the java programs - a maven script to build it.··
165 *
166 * When generating data, its best to have each map task generate a multiple of
167 * 25 million. The reason for this is that circular linked list are generated
168 * every 25M. Not generating a multiple in 25M will result in some nodes in the
169 * linked list not having references. The loss of an unreferenced node can not
170 * be detected.
171 *
172 *
173 * Below is a description of the Java programs
174 *
175 * Generator - A map only job that generates data. As stated previously,·its best to generate data
176 * in multiples of 25M. An option is also available to allow concurrent walkers to select and walk
177 * random flushed loops during this phase.
178 *
179 * Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and
180 * UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same
181 * time as the Generator.
182 *
183 * Walker - A standalone program that start following a linked list· and emits timing info.··
184 *
185 * Print - A standalone program that prints nodes in the linked list
186 *
187 * Delete - A standalone program that deletes a single node
188 *
189 * This class can be run as a unit test, as an integration test, or from the command line
190 *
191 * ex:
192 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
193 *    loop 2 1 100000 /temp 1 1000 50 1 0
194 *
195 */
196@Category(IntegrationTests.class)
197public class IntegrationTestBigLinkedList extends IntegrationTestBase {
198  protected static final byte[] NO_KEY = new byte[1];
199
200  protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
201
202  protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
203
204  protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
205  private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
206  private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
207
208  //link to the id of the prev node in the linked list
209  protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
210
211  //identifier of the mapred task that generated this row
212  protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
213
214  //the id of the row within the same client.
215  protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
216
217  /** How many rows to write per map task. This has to be a multiple of 25M */
218  private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
219    = "IntegrationTestBigLinkedList.generator.num_rows";
220
221  private static final String GENERATOR_NUM_MAPPERS_KEY
222    = "IntegrationTestBigLinkedList.generator.map.tasks";
223
224  private static final String GENERATOR_WIDTH_KEY
225    = "IntegrationTestBigLinkedList.generator.width";
226
227  private static final String GENERATOR_WRAP_KEY
228    = "IntegrationTestBigLinkedList.generator.wrap";
229
230  private static final String CONCURRENT_WALKER_KEY
231    = "IntegrationTestBigLinkedList.generator.concurrentwalkers";
232
233  protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
234
235  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
236
237  private static final int WIDTH_DEFAULT = 1000000;
238  private static final int WRAP_DEFAULT = 25;
239  private static final int ROWKEY_LENGTH = 16;
240
241  private static final int CONCURRENT_WALKER_DEFAULT = 0;
242
243  protected String toRun;
244  protected String[] otherArgs;
245
246  static class CINode {
247    byte[] key;
248    byte[] prev;
249    String client;
250    long count;
251  }
252
253  /**
254   * A Map only job that generates random linked list and stores them.
255   */
256  static class Generator extends Configured implements Tool {
257
258    private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
259
260    /**
261     * Set this configuration if you want to test single-column family flush works. If set, we will
262     * add a big column family and a small column family on either side of the usual ITBLL 'meta'
263     * column family. When we write out the ITBLL, we will also add to the big column family a value
264     * bigger than that for ITBLL and for small, something way smaller. The idea is that when
265     * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
266     * way. Here is how you would pass it:
267     * <p>
268     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
269     * -Dgenerator.multiple.columnfamilies=true generator 1 10 g
270     */
271    public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
272        "generator.multiple.columnfamilies";
273
274    /**
275     * Set this configuration if you want to scale up the size of test data quickly.
276     * <p>
277     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
278     * -Dgenerator.big.family.value.size=1024 generator 1 10 output
279     */
280    public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size";
281
282
283    public static enum Counts {
284      SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION
285    }
286
287    public static final String USAGE =  "Usage : " + Generator.class.getSimpleName() +
288        " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" +
289        " <num walker threads>] \n" +
290        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
291        "walkers will verify random flushed loop during Generation.";
292
293    public Job job;
294
295    static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
296      static class GeneratorInputSplit extends InputSplit implements Writable {
297        @Override
298        public long getLength() throws IOException, InterruptedException {
299          return 1;
300        }
301        @Override
302        public String[] getLocations() throws IOException, InterruptedException {
303          return new String[0];
304        }
305        @Override
306        public void readFields(DataInput arg0) throws IOException {
307        }
308        @Override
309        public void write(DataOutput arg0) throws IOException {
310        }
311      }
312
313      static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
314        private long count;
315        private long numNodes;
316        private Random64 rand;
317
318        @Override
319        public void close() throws IOException {
320        }
321
322        @Override
323        public BytesWritable getCurrentKey() throws IOException, InterruptedException {
324          byte[] bytes = new byte[ROWKEY_LENGTH];
325          rand.nextBytes(bytes);
326          return new BytesWritable(bytes);
327        }
328
329        @Override
330        public NullWritable getCurrentValue() throws IOException, InterruptedException {
331          return NullWritable.get();
332        }
333
334        @Override
335        public float getProgress() throws IOException, InterruptedException {
336          return (float)(count / (double)numNodes);
337        }
338
339        @Override
340        public void initialize(InputSplit arg0, TaskAttemptContext context)
341            throws IOException, InterruptedException {
342          numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
343          // Use Random64 to avoid issue described in HBASE-21256.
344          rand = new Random64();
345        }
346
347        @Override
348        public boolean nextKeyValue() throws IOException, InterruptedException {
349          return count++ < numNodes;
350        }
351
352      }
353
354      @Override
355      public RecordReader<BytesWritable,NullWritable> createRecordReader(
356          InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
357        GeneratorRecordReader rr = new GeneratorRecordReader();
358        rr.initialize(split, context);
359        return rr;
360      }
361
362      @Override
363      public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
364        int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
365
366        ArrayList<InputSplit> splits = new ArrayList<>(numMappers);
367
368        for (int i = 0; i < numMappers; i++) {
369          splits.add(new GeneratorInputSplit());
370        }
371
372        return splits;
373      }
374    }
375
376    /** Ensure output files from prev-job go to map inputs for current job */
377    static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
378      @Override
379      protected boolean isSplitable(JobContext context, Path filename) {
380        return false;
381      }
382    }
383
384    /**
385     * Some ASCII art time:
386     * <p>
387     * [ . . . ] represents one batch of random longs of length WIDTH
388     * <pre>
389     *                _________________________
390     *               |                  ______ |
391     *               |                 |      ||
392     *             .-+-----------------+-----.||
393     *             | |                 |     |||
394     * first   = [ . . . . . . . . . . . ]   |||
395     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
396     *             | | | | | | | | | | |     |||
397     * prev    = [ . . . . . . . . . . . ]   |||
398     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
399     *             | | | | | | | | | | |     |||
400     * current = [ . . . . . . . . . . . ]   |||
401     *                                       |||
402     * ...                                   |||
403     *                                       |||
404     * last    = [ . . . . . . . . . . . ]   |||
405     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
406     *             |                 |________||
407     *             |___________________________|
408     * </pre>
409     */
410
411    static class GeneratorMapper
412      extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
413
414      byte[][] first = null;
415      byte[][] prev = null;
416      byte[][] current = null;
417      byte[] id;
418      long count = 0;
419      int i;
420      BufferedMutator mutator;
421      Connection connection;
422      long numNodes;
423      long wrap;
424      int width;
425      boolean multipleUnevenColumnFamilies;
426      byte[] tinyValue = new byte[] { 't' };
427      byte[] bigValue = null;
428      Configuration conf;
429
430      volatile boolean walkersStop;
431      int numWalkers;
432      volatile List<Long> flushedLoops = new ArrayList<>();
433      List<Thread> walkers = new ArrayList<>();
434
435      @Override
436      protected void setup(Context context) throws IOException, InterruptedException {
437        id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
438        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
439        instantiateHTable();
440        this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
441        current = new byte[this.width][];
442        int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
443        this.wrap = (long)wrapMultiplier * width;
444        this.numNodes = context.getConfiguration().getLong(
445            GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
446        if (this.numNodes < this.wrap) {
447          this.wrap = this.numNodes;
448        }
449        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
450        this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
451        this.walkersStop = false;
452        this.conf = context.getConfiguration();
453
454        if (multipleUnevenColumnFamilies) {
455          int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256);
456          int limit = context.getConfiguration().getInt(
457            ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
458            ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT);
459
460          Preconditions.checkArgument(
461            n <= limit,
462            "%s(%s) > %s(%s)",
463            BIG_FAMILY_VALUE_SIZE_KEY, n, ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit);
464
465          bigValue = new byte[n];
466          ThreadLocalRandom.current().nextBytes(bigValue);
467          LOG.info("Create a bigValue with " + n + " bytes.");
468        }
469
470        Preconditions.checkArgument(
471          numNodes > 0,
472          "numNodes(%s) <= 0",
473          numNodes);
474        Preconditions.checkArgument(
475          numNodes % width == 0,
476          "numNodes(%s) mod width(%s) != 0",
477          numNodes, width);
478        Preconditions.checkArgument(
479          numNodes % wrap == 0,
480          "numNodes(%s) mod wrap(%s) != 0",
481          numNodes, wrap
482        );
483      }
484
485      protected void instantiateHTable() throws IOException {
486        mutator = connection.getBufferedMutator(
487            new BufferedMutatorParams(getTableName(connection.getConfiguration()))
488                .writeBufferSize(4 * 1024 * 1024));
489      }
490
491      @Override
492      protected void cleanup(Context context) throws IOException ,InterruptedException {
493        joinWalkers();
494        mutator.close();
495        connection.close();
496      }
497
498      @Override
499      protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
500        current[i] = new byte[key.getLength()];
501        System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
502        if (++i == current.length) {
503          LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=",
504            current.length, count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i);
505          persist(output, count, prev, current, id);
506          i = 0;
507
508          if (first == null) {
509            first = current;
510          }
511          prev = current;
512          current = new byte[this.width][];
513
514          count += current.length;
515          output.setStatus("Count " + count);
516
517          if (count % wrap == 0) {
518            // this block of code turns the 1 million linked list of length 25 into one giant
519            //circular linked list of 25 million
520            circularLeftShift(first);
521            persist(output, -1, prev, first, null);
522            // At this point the entire loop has been flushed so we can add one of its nodes to the
523            // concurrent walker
524            if (numWalkers > 0) {
525              addFlushed(key.getBytes());
526              if (walkers.isEmpty()) {
527                startWalkers(numWalkers, conf, output);
528              }
529            }
530            first = null;
531            prev = null;
532          }
533        }
534      }
535
536      private static <T> void circularLeftShift(T[] first) {
537        T ez = first[0];
538        System.arraycopy(first, 1, first, 0, first.length - 1);
539        first[first.length - 1] = ez;
540      }
541
542      private void addFlushed(byte[] rowKey) {
543        synchronized (flushedLoops) {
544          flushedLoops.add(Bytes.toLong(rowKey));
545          flushedLoops.notifyAll();
546        }
547      }
548
549      protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
550          throws IOException {
551        for (int i = 0; i < current.length; i++) {
552
553          if (i % 100 == 0) {
554            // Tickle progress every so often else maprunner will think us hung
555            output.progress();
556          }
557
558          Put put = new Put(current[i]);
559          put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
560
561          if (count >= 0) {
562            put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
563          }
564          if (id != null) {
565            put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
566          }
567          // See if we are to write multiple columns.
568          if (this.multipleUnevenColumnFamilies) {
569            // Use any column name.
570            put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
571            // Use any column name.
572            put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
573          }
574          mutator.mutate(put);
575        }
576
577        mutator.flush();
578      }
579
580      private void startWalkers(int numWalkers, Configuration conf, Context context) {
581        LOG.info("Starting " + numWalkers + " concurrent walkers");
582        for (int i = 0; i < numWalkers; i++) {
583          Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context));
584          walker.start();
585          walkers.add(walker);
586        }
587      }
588
589      private void joinWalkers() {
590        walkersStop = true;
591        synchronized (flushedLoops) {
592          flushedLoops.notifyAll();
593        }
594        for (Thread walker : walkers) {
595          try {
596            walker.join();
597          } catch (InterruptedException e) {
598            // no-op
599          }
600        }
601      }
602
603      /**
604       * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by
605       * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are
606       * configured to only log erroneous nodes.
607       */
608
609      public class ContinuousConcurrentWalker implements Runnable {
610
611        ConcurrentWalker walker;
612        Configuration conf;
613        Context context;
614        Random rand;
615
616        public ContinuousConcurrentWalker(Configuration conf, Context context) {
617          this.conf = conf;
618          this.context = context;
619          rand = new Random();
620        }
621
622        @Override
623        public void run() {
624          while (!walkersStop) {
625            try {
626              long node = selectLoop();
627              try {
628                walkLoop(node);
629              } catch (IOException e) {
630                context.getCounter(Counts.IOEXCEPTION).increment(1l);
631                return;
632              }
633            } catch (InterruptedException e) {
634              return;
635            }
636          }
637        }
638
639        private void walkLoop(long node) throws IOException {
640          walker = new ConcurrentWalker(context);
641          walker.setConf(conf);
642          walker.run(node, wrap);
643        }
644
645        private long selectLoop () throws InterruptedException{
646          synchronized (flushedLoops) {
647            while (flushedLoops.isEmpty() && !walkersStop) {
648              flushedLoops.wait();
649            }
650            if (walkersStop) {
651              throw new InterruptedException();
652            }
653            return flushedLoops.get(rand.nextInt(flushedLoops.size()));
654          }
655        }
656      }
657
658      public static class ConcurrentWalker extends WalkerBase {
659
660        Context context;
661
662        public ConcurrentWalker(Context context) {this.context = context;}
663
664        public void run(long startKeyIn, long maxQueriesIn) throws IOException {
665
666          long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
667          byte[] startKey = Bytes.toBytes(startKeyIn);
668
669          Connection connection = ConnectionFactory.createConnection(getConf());
670          Table table = connection.getTable(getTableName(getConf()));
671          long numQueries = 0;
672          // If isSpecificStart is set, only walk one list from that particular node.
673          // Note that in case of circular (or P-shaped) list it will walk forever, as is
674          // the case in normal run without startKey.
675
676          CINode node = findStartNode(table, startKey);
677          if (node == null) {
678            LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
679            throw new IOException("Start node not found: " + startKeyIn);
680          }
681          while (numQueries < maxQueries) {
682            numQueries++;
683            byte[] prev = node.prev;
684            long t1 = System.currentTimeMillis();
685            node = getNode(prev, table, node);
686            long t2 = System.currentTimeMillis();
687            if (node == null) {
688              LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
689              context.getCounter(Counts.UNDEFINED).increment(1l);
690            } else if (node.prev.length == NO_KEY.length) {
691              LOG.error("ConcurrentWalker found TERMINATING NODE: " +
692                  Bytes.toStringBinary(node.key));
693              context.getCounter(Counts.TERMINATING).increment(1l);
694            } else {
695              // Increment for successful walk
696              context.getCounter(Counts.SUCCESS).increment(1l);
697            }
698          }
699          table.close();
700          connection.close();
701        }
702      }
703    }
704
705    @Override
706    public int run(String[] args) throws Exception {
707      if (args.length < 3) {
708        System.err.println(USAGE);
709        return 1;
710      }
711      try {
712        int numMappers = Integer.parseInt(args[0]);
713        long numNodes = Long.parseLong(args[1]);
714        Path tmpOutput = new Path(args[2]);
715        Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
716        Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
717        Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]);
718        return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
719      } catch (NumberFormatException e) {
720        System.err.println("Parsing generator arguments failed: " + e.getMessage());
721        System.err.println(USAGE);
722        return 1;
723      }
724    }
725
726    protected void createSchema() throws IOException {
727      Configuration conf = getConf();
728      TableName tableName = getTableName(conf);
729      try (Connection conn = ConnectionFactory.createConnection(conf);
730          Admin admin = conn.getAdmin()) {
731        if (!admin.tableExists(tableName)) {
732          HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
733          htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
734          // Always add these families. Just skip writing to them when we do not test per CF flush.
735          htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME));
736          htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME));
737          // if -DuseMob=true force all data through mob path.
738          if (conf.getBoolean("useMob", false)) {
739            for (HColumnDescriptor hcd : htd.getColumnFamilies() ) {
740              hcd.setMobEnabled(true);
741              hcd.setMobThreshold(4);
742            }
743          }
744
745          // If we want to pre-split compute how many splits.
746          if (conf.getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY,
747              HBaseTestingUtility.PRESPLIT_TEST_TABLE)) {
748            int numberOfServers =
749                admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
750                    .getLiveServerMetrics().size();
751            if (numberOfServers == 0) {
752              throw new IllegalStateException("No live regionservers");
753            }
754            int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
755                HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
756            int totalNumberOfRegions = numberOfServers * regionsPerServer;
757            LOG.info("Number of live regionservers: " + numberOfServers + ", " +
758                "pre-splitting table into " + totalNumberOfRegions + " regions " +
759                "(default regions per server: " + regionsPerServer + ")");
760
761
762            byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
763
764            admin.createTable(htd, splits);
765          } else {
766            // Looks like we're just letting things play out.
767            // Create a table with on region by default.
768            // This will make the splitting work hard.
769            admin.createTable(htd);
770          }
771        }
772      } catch (MasterNotRunningException e) {
773        LOG.error("Master not running", e);
774        throw new IOException(e);
775      }
776    }
777
778    public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
779        Integer width, Integer wrapMultiplier, Integer numWalkers)
780        throws Exception {
781      LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
782          + ", numNodes=" + numNodes);
783      Job job = Job.getInstance(getConf());
784
785      job.setJobName("Random Input Generator");
786      job.setNumReduceTasks(0);
787      job.setJarByClass(getClass());
788
789      job.setInputFormatClass(GeneratorInputFormat.class);
790      job.setOutputKeyClass(BytesWritable.class);
791      job.setOutputValueClass(NullWritable.class);
792
793      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
794
795      job.setMapperClass(Mapper.class); //identity mapper
796
797      FileOutputFormat.setOutputPath(job, tmpOutput);
798      job.setOutputFormatClass(SequenceFileOutputFormat.class);
799      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);
800
801      boolean success = jobCompletion(job);
802
803      return success ? 0 : 1;
804    }
805
806    public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
807        Integer width, Integer wrapMultiplier, Integer numWalkers)
808        throws Exception {
809      LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
810      createSchema();
811      job = Job.getInstance(getConf());
812
813      job.setJobName("Link Generator");
814      job.setNumReduceTasks(0);
815      job.setJarByClass(getClass());
816
817      FileInputFormat.setInputPaths(job, tmpOutput);
818      job.setInputFormatClass(OneFilePerMapperSFIF.class);
819      job.setOutputKeyClass(NullWritable.class);
820      job.setOutputValueClass(NullWritable.class);
821
822      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
823
824      setMapperForGenerator(job);
825
826      job.setOutputFormatClass(NullOutputFormat.class);
827
828      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
829      TableMapReduceUtil.addDependencyJars(job);
830      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
831                                                     AbstractHBaseTool.class);
832      TableMapReduceUtil.initCredentials(job);
833
834      boolean success = jobCompletion(job);
835
836      return success ? 0 : 1;
837    }
838
839    protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
840        ClassNotFoundException {
841      boolean success = job.waitForCompletion(true);
842      return success;
843    }
844
845    protected void setMapperForGenerator(Job job) {
846      job.setMapperClass(GeneratorMapper.class);
847    }
848
849    public int run(int numMappers, long numNodes, Path tmpOutput,
850        Integer width, Integer wrapMultiplier, Integer numWalkers)
851        throws Exception {
852      int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier,
853          numWalkers);
854      if (ret > 0) {
855        return ret;
856      }
857      return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
858    }
859
860    public boolean verify() {
861      try {
862        Counters counters = job.getCounters();
863        if (counters == null) {
864          LOG.info("Counters object was null, Generator verification cannot be performed."
865              + " This is commonly a result of insufficient YARN configuration.");
866          return false;
867        }
868
869        if (counters.findCounter(Counts.TERMINATING).getValue() > 0 ||
870            counters.findCounter(Counts.UNDEFINED).getValue() > 0 ||
871            counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) {
872          LOG.error("Concurrent walker failed to verify during Generation phase");
873          LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue());
874          LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue());
875          LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue());
876          return false;
877        }
878      } catch (IOException e) {
879        LOG.info("Generator verification could not find counter");
880        return false;
881      }
882      return true;
883    }
884  }
885
886  /**
887   * Tool to search missing rows in WALs and hfiles.
888   * Pass in file or dir of keys to search for. Key file must have been written by Verify step
889   * (we depend on the format it writes out. We'll read them in and then search in hbase
890   * WALs and oldWALs dirs (Some of this is TODO).
891   */
892  static class Search extends Configured implements Tool {
893    private static final Logger LOG = LoggerFactory.getLogger(Search.class);
894    protected Job job;
895
896    private static void printUsage(final String error) {
897      if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
898      System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
899    }
900
901    @Override
902    public int run(String[] args) throws Exception {
903      if (args.length < 1 || args.length > 2) {
904        printUsage(null);
905        return 1;
906      }
907      Path inputDir = new Path(args[0]);
908      int numMappers = 1;
909      if (args.length > 1) {
910        numMappers = Integer.parseInt(args[1]);
911      }
912      return run(inputDir, numMappers);
913    }
914
915    /**
916     * WALPlayer override that searches for keys loaded in the setup.
917     */
918    public static class WALSearcher extends WALPlayer {
919      public WALSearcher(Configuration conf) {
920        super(conf);
921      }
922
923      /**
924       * The actual searcher mapper.
925       */
926      public static class WALMapperSearcher extends WALMapper {
927        private SortedSet<byte []> keysToFind;
928        private AtomicInteger rows = new AtomicInteger(0);
929
930        @Override
931        public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
932            throws IOException {
933          super.setup(context);
934          try {
935            this.keysToFind = readKeysToSearch(context.getConfiguration());
936            LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
937          } catch (InterruptedException e) {
938            throw new InterruptedIOException(e.toString());
939          }
940        }
941
942        @Override
943        protected boolean filter(Context context, Cell cell) {
944          // TODO: Can I do a better compare than this copying out key?
945          byte [] row = new byte [cell.getRowLength()];
946          System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
947          boolean b = this.keysToFind.contains(row);
948          if (b) {
949            String keyStr = Bytes.toStringBinary(row);
950            try {
951              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
952            } catch (IOException|InterruptedException e) {
953              LOG.warn(e.toString(), e);
954            }
955            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
956              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
957            }
958            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
959          }
960          return b;
961        }
962      }
963
964      // Put in place the above WALMapperSearcher.
965      @Override
966      public Job createSubmittableJob(String[] args) throws IOException {
967        Job job = super.createSubmittableJob(args);
968        // Call my class instead.
969        job.setJarByClass(WALMapperSearcher.class);
970        job.setMapperClass(WALMapperSearcher.class);
971        job.setOutputFormatClass(NullOutputFormat.class);
972        return job;
973      }
974    }
975
976    static final String FOUND_GROUP_KEY = "Found";
977    static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
978
979    public int run(Path inputDir, int numMappers) throws Exception {
980      getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
981      SortedSet<byte []> keys = readKeysToSearch(getConf());
982      if (keys.isEmpty()) throw new RuntimeException("No keys to find");
983      LOG.info("Count of keys to find: " + keys.size());
984      for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
985      // Now read all WALs. In two dirs. Presumes certain layout.
986      Path walsDir = new Path(
987          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
988      Path oldWalsDir = new Path(
989          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
990      LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
991        " against " + getConf().get(HConstants.HBASE_DIR));
992      int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
993          new String [] {walsDir.toString(), ""});
994      if (ret != 0) {
995        return ret;
996      }
997      return ToolRunner.run(getConf(), new WALSearcher(getConf()),
998          new String [] {oldWalsDir.toString(), ""});
999    }
1000
1001    static SortedSet<byte []> readKeysToSearch(final Configuration conf)
1002    throws IOException, InterruptedException {
1003      Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
1004      FileSystem fs = FileSystem.get(conf);
1005      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1006      if (!fs.exists(keysInputDir)) {
1007        throw new FileNotFoundException(keysInputDir.toString());
1008      }
1009      if (!fs.isDirectory(keysInputDir)) {
1010        throw new UnsupportedOperationException("TODO");
1011      } else {
1012        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
1013        while(iterator.hasNext()) {
1014          LocatedFileStatus keyFileStatus = iterator.next();
1015          // Skip "_SUCCESS" file.
1016          if (keyFileStatus.getPath().getName().startsWith("_")) continue;
1017          result.addAll(readFileToSearch(conf, fs, keyFileStatus));
1018        }
1019      }
1020      return result;
1021    }
1022
1023    private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
1024        final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
1025        InterruptedException {
1026      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1027      // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
1028      // what is missing.
1029      TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
1030      try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
1031          new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
1032        InputSplit is =
1033          new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
1034        rr.initialize(is, context);
1035        while (rr.nextKeyValue()) {
1036          rr.getCurrentKey();
1037          BytesWritable bw = rr.getCurrentValue();
1038          if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
1039            byte[] key = new byte[rr.getCurrentKey().getLength()];
1040            System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
1041                .getLength());
1042            result.add(key);
1043          }
1044        }
1045      }
1046      return result;
1047    }
1048  }
1049
1050  /**
1051   * A Map Reduce job that verifies that the linked lists generated by
1052   * {@link Generator} do not have any holes.
1053   */
1054  static class Verify extends Configured implements Tool {
1055
1056    private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
1057    protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
1058    protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
1059
1060    protected Job job;
1061
1062    public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
1063      private BytesWritable row = new BytesWritable();
1064      private BytesWritable ref = new BytesWritable();
1065
1066      private boolean multipleUnevenColumnFamilies;
1067
1068      @Override
1069      protected void setup(
1070          Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
1071          throws IOException, InterruptedException {
1072        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
1073      }
1074
1075      @Override
1076      protected void map(ImmutableBytesWritable key, Result value, Context context)
1077          throws IOException ,InterruptedException {
1078        byte[] rowKey = key.get();
1079        row.set(rowKey, 0, rowKey.length);
1080        if (multipleUnevenColumnFamilies
1081            && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn(
1082              TINY_FAMILY_NAME, TINY_FAMILY_NAME))) {
1083          context.write(row, DEF_LOST_FAMILIES);
1084        } else {
1085          context.write(row, DEF);
1086        }
1087        byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
1088        if (prev != null && prev.length > 0) {
1089          ref.set(prev, 0, prev.length);
1090          context.write(ref, row);
1091        } else {
1092          LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
1093        }
1094      }
1095    }
1096
1097    /**
1098     * Don't change the order of these enums. Their ordinals are used as type flag when we emit
1099     * problems found from the reducer.
1100     */
1101    public static enum Counts {
1102      UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES,
1103      LOST_FAMILIES
1104    }
1105
1106    /**
1107     * Per reducer, we output problem rows as byte arrasy so can be used as input for
1108     * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag
1109     * saying what sort of emission it is. Flag is the Count enum ordinal as a short.
1110     */
1111    public static class VerifyReducer extends
1112        Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
1113      private ArrayList<byte[]> refs = new ArrayList<>();
1114      private final BytesWritable UNREF = new BytesWritable(addPrefixFlag(
1115        Counts.UNREFERENCED.ordinal(), new byte[] {}));
1116      private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag(
1117        Counts.LOST_FAMILIES.ordinal(), new byte[] {}));
1118
1119      private AtomicInteger rows = new AtomicInteger(0);
1120      private Connection connection;
1121
1122      @Override
1123      protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1124      throws IOException, InterruptedException {
1125        super.setup(context);
1126        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
1127      }
1128
1129      @Override
1130      protected void cleanup(
1131          Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1132          throws IOException, InterruptedException {
1133        if (this.connection != null) {
1134          this.connection.close();
1135        }
1136        super.cleanup(context);
1137      }
1138
1139      /**
1140       * @param ordinal
1141       * @param r
1142       * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up
1143       * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
1144       */
1145      public static byte[] addPrefixFlag(final int ordinal, final byte [] r) {
1146        byte[] prefix = Bytes.toBytes((short)ordinal);
1147        if (prefix.length != Bytes.SIZEOF_SHORT) {
1148          throw new RuntimeException("Unexpected size: " + prefix.length);
1149        }
1150        byte[] result = new byte[prefix.length + r.length];
1151        System.arraycopy(prefix, 0, result, 0, prefix.length);
1152        System.arraycopy(r, 0, result, prefix.length, r.length);
1153        return result;
1154      }
1155
1156      /**
1157       * @param bs
1158       * @return Type from the Counts enum of this row. Reads prefix added by
1159       * {@link #addPrefixFlag(int, byte[])}
1160       */
1161      public static Counts whichType(final byte [] bs) {
1162        int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
1163        return Counts.values()[ordinal];
1164      }
1165
1166      /**
1167       * @param bw
1168       * @return Row bytes minus the type flag.
1169       */
1170      public static byte[] getRowOnly(BytesWritable bw) {
1171        byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
1172        System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
1173        return bytes;
1174      }
1175
1176      @Override
1177      public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
1178          throws IOException, InterruptedException {
1179        int defCount = 0;
1180        boolean lostFamilies = false;
1181        refs.clear();
1182        for (BytesWritable type : values) {
1183          if (type.getLength() == DEF.getLength()) {
1184            defCount++;
1185            if (type.getBytes()[0] == 1) {
1186              lostFamilies = true;
1187            }
1188          } else {
1189            byte[] bytes = new byte[type.getLength()];
1190            System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
1191            refs.add(bytes);
1192          }
1193        }
1194
1195        // TODO check for more than one def, should not happen
1196        StringBuilder refsSb = null;
1197        if (defCount == 0 || refs.size() != 1) {
1198          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1199          refsSb = dumpExtraInfoOnRefs(key, context, refs);
1200          LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
1201            (refsSb != null? refsSb.toString(): ""));
1202        }
1203        if (lostFamilies) {
1204          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1205          LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
1206          context.getCounter(Counts.LOST_FAMILIES).increment(1);
1207          context.write(key, LOSTFAM);
1208        }
1209
1210        if (defCount == 0 && refs.size() > 0) {
1211          // This is bad, found a node that is referenced but not defined. It must have been
1212          // lost, emit some info about this node for debugging purposes.
1213          // Write out a line per reference. If more than one, flag it.;
1214          for (int i = 0; i < refs.size(); i++) {
1215            byte[] bs = refs.get(i);
1216            int ordinal;
1217            if (i <= 0) {
1218              ordinal = Counts.UNDEFINED.ordinal();
1219              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1220              context.getCounter(Counts.UNDEFINED).increment(1);
1221            } else {
1222              ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
1223              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1224            }
1225          }
1226          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1227            // Print out missing row; doing get on reference gives info on when the referencer
1228            // was added which can help a little debugging. This info is only available in mapper
1229            // output -- the 'Linked List error Key...' log message above. What we emit here is
1230            // useless for debugging.
1231            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1232            context.getCounter("undef", keyString).increment(1);
1233          }
1234        } else if (defCount > 0 && refs.isEmpty()) {
1235          // node is defined but not referenced
1236          context.write(key, UNREF);
1237          context.getCounter(Counts.UNREFERENCED).increment(1);
1238          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1239            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1240            context.getCounter("unref", keyString).increment(1);
1241          }
1242        } else {
1243          if (refs.size() > 1) {
1244            // Skip first reference.
1245            for (int i = 1; i < refs.size(); i++) {
1246              context.write(key,
1247                new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
1248            }
1249            context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
1250          }
1251          // node is defined and referenced
1252          context.getCounter(Counts.REFERENCED).increment(1);
1253        }
1254      }
1255
1256      /**
1257       * Dump out extra info around references if there are any. Helps debugging.
1258       * @return StringBuilder filled with references if any.
1259       * @throws IOException
1260       */
1261      private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1262          final List<byte []> refs)
1263      throws IOException {
1264        StringBuilder refsSb = null;
1265        if (refs.isEmpty()) return refsSb;
1266        refsSb = new StringBuilder();
1267        String comma = "";
1268        // If a row is a reference but has no define, print the content of the row that has
1269        // this row as a 'prev'; it will help debug.  The missing row was written just before
1270        // the row we are dumping out here.
1271        TableName tn = getTableName(context.getConfiguration());
1272        try (Table t = this.connection.getTable(tn)) {
1273          for (byte [] ref : refs) {
1274            Result r = t.get(new Get(ref));
1275            List<Cell> cells = r.listCells();
1276            String ts = (cells != null && !cells.isEmpty())?
1277                new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
1278            byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1279            String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
1280            b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1281            long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
1282            b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1283            String refRegionLocation = "";
1284            String keyRegionLocation = "";
1285            if (b != null && b.length > 0) {
1286              try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1287                HRegionLocation hrl = rl.getRegionLocation(b);
1288                if (hrl != null) refRegionLocation = hrl.toString();
1289                // Key here probably has trailing zeros on it.
1290                hrl = rl.getRegionLocation(key.getBytes());
1291                if (hrl != null) keyRegionLocation = hrl.toString();
1292              }
1293            }
1294            LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) +
1295              ", refPrevEqualsKey=" +
1296                (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) +
1297                ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) +
1298                ", ref row date=" + ts + ", jobStr=" + jobStr +
1299                ", ref row count=" + count +
1300                ", ref row regionLocation=" + refRegionLocation +
1301                ", key row regionLocation=" + keyRegionLocation);
1302            refsSb.append(comma);
1303            comma = ",";
1304            refsSb.append(Bytes.toStringBinary(ref));
1305          }
1306        }
1307        return refsSb;
1308      }
1309    }
1310
1311    @Override
1312    public int run(String[] args) throws Exception {
1313      if (args.length != 2) {
1314        System.out.println("Usage : " + Verify.class.getSimpleName()
1315            + " <output dir> <num reducers>");
1316        return 0;
1317      }
1318
1319      String outputDir = args[0];
1320      int numReducers = Integer.parseInt(args[1]);
1321
1322      return run(outputDir, numReducers);
1323    }
1324
1325    public int run(String outputDir, int numReducers) throws Exception {
1326      return run(new Path(outputDir), numReducers);
1327    }
1328
1329    public int run(Path outputDir, int numReducers) throws Exception {
1330      LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
1331
1332      job = Job.getInstance(getConf());
1333
1334      job.setJobName("Link Verifier");
1335      job.setNumReduceTasks(numReducers);
1336      job.setJarByClass(getClass());
1337
1338      setJobScannerConf(job);
1339
1340      Scan scan = new Scan();
1341      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1342      scan.setCaching(10000);
1343      scan.setCacheBlocks(false);
1344      if (isMultiUnevenColumnFamilies(getConf())) {
1345        scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1346        scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1347      }
1348
1349      TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1350          VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1351      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
1352                                                     AbstractHBaseTool.class);
1353
1354      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1355
1356      job.setReducerClass(VerifyReducer.class);
1357      job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1358      job.setOutputKeyClass(BytesWritable.class);
1359      job.setOutputValueClass(BytesWritable.class);
1360      TextOutputFormat.setOutputPath(job, outputDir);
1361
1362      boolean success = job.waitForCompletion(true);
1363
1364      if (success) {
1365        Counters counters = job.getCounters();
1366        if (null == counters) {
1367          LOG.warn("Counters were null, cannot verify Job completion."
1368              + " This is commonly a result of insufficient YARN configuration.");
1369          // We don't have access to the counters to know if we have "bad" counts
1370          return 0;
1371        }
1372
1373        // If we find no unexpected values, the job didn't outright fail
1374        if (verifyUnexpectedValues(counters)) {
1375          // We didn't check referenced+unreferenced counts, leave that to visual inspection
1376          return 0;
1377        }
1378      }
1379
1380      // We failed
1381      return 1;
1382    }
1383
1384    public boolean verify(long expectedReferenced) throws Exception {
1385      if (job == null) {
1386        throw new IllegalStateException("You should call run() first");
1387      }
1388
1389      Counters counters = job.getCounters();
1390      if (counters == null) {
1391        LOG.info("Counters object was null, write verification cannot be performed."
1392              + " This is commonly a result of insufficient YARN configuration.");
1393        return false;
1394      }
1395
1396      // Run through each check, even if we fail one early
1397      boolean success = verifyExpectedValues(expectedReferenced, counters);
1398
1399      if (!verifyUnexpectedValues(counters)) {
1400        // We found counter objects which imply failure
1401        success = false;
1402      }
1403
1404      if (!success) {
1405        handleFailure(counters);
1406      }
1407      return success;
1408    }
1409
1410    /**
1411     * Verify the values in the Counters against the expected number of entries written.
1412     *
1413     * @param expectedReferenced
1414     *          Expected number of referenced entrires
1415     * @param counters
1416     *          The Job's Counters object
1417     * @return True if the values match what's expected, false otherwise
1418     */
1419    protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1420      final Counter referenced = counters.findCounter(Counts.REFERENCED);
1421      final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
1422      boolean success = true;
1423
1424      if (expectedReferenced != referenced.getValue()) {
1425        LOG.error("Expected referenced count does not match with actual referenced count. " +
1426            "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1427        success = false;
1428      }
1429
1430      if (unreferenced.getValue() > 0) {
1431        final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
1432        boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1433        LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1434            + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1435        success = false;
1436      }
1437
1438      return success;
1439    }
1440
1441    /**
1442     * Verify that the Counters don't contain values which indicate an outright failure from the Reducers.
1443     *
1444     * @param counters
1445     *          The Job's counters
1446     * @return True if the "bad" counter objects are 0, false otherwise
1447     */
1448    protected boolean verifyUnexpectedValues(Counters counters) {
1449      final Counter undefined = counters.findCounter(Counts.UNDEFINED);
1450      final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
1451      boolean success = true;
1452
1453      if (undefined.getValue() > 0) {
1454        LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1455        success = false;
1456      }
1457
1458      if (lostfamilies.getValue() > 0) {
1459        LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1460        success = false;
1461      }
1462
1463      return success;
1464    }
1465
1466    protected void handleFailure(Counters counters) throws IOException {
1467      Configuration conf = job.getConfiguration();
1468      TableName tableName = getTableName(conf);
1469      try (Connection conn = ConnectionFactory.createConnection(conf)) {
1470        try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1471          CounterGroup g = counters.getGroup("undef");
1472          Iterator<Counter> it = g.iterator();
1473          while (it.hasNext()) {
1474            String keyString = it.next().getName();
1475            byte[] key = Bytes.toBytes(keyString);
1476            HRegionLocation loc = rl.getRegionLocation(key, true);
1477            LOG.error("undefined row " + keyString + ", " + loc);
1478          }
1479          g = counters.getGroup("unref");
1480          it = g.iterator();
1481          while (it.hasNext()) {
1482            String keyString = it.next().getName();
1483            byte[] key = Bytes.toBytes(keyString);
1484            HRegionLocation loc = rl.getRegionLocation(key, true);
1485            LOG.error("unreferred row " + keyString + ", " + loc);
1486          }
1487        }
1488      }
1489    }
1490  }
1491
1492  /**
1493   * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
1494   * adds more data.
1495   */
1496  static class Loop extends Configured implements Tool {
1497
1498    private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
1499    private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " +
1500        "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" +
1501        " <num walker threads>] \n" +
1502        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
1503        "walkers will select and verify random flushed loop during Generation.";
1504
1505    IntegrationTestBigLinkedList it;
1506
1507    protected void runGenerator(int numMappers, long numNodes,
1508        String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers)
1509        throws Exception {
1510      Path outputPath = new Path(outputDir);
1511      UUID uuid = UUID.randomUUID(); //create a random UUID.
1512      Path generatorOutput = new Path(outputPath, uuid.toString());
1513
1514      Generator generator = new Generator();
1515      generator.setConf(getConf());
1516      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
1517          numWalkers);
1518      if (retCode > 0) {
1519        throw new RuntimeException("Generator failed with return code: " + retCode);
1520      }
1521      if (numWalkers > 0) {
1522        if (!generator.verify()) {
1523          throw new RuntimeException("Generator.verify failed");
1524        }
1525      }
1526    }
1527
1528    protected void runVerify(String outputDir,
1529        int numReducers, long expectedNumNodes) throws Exception {
1530      Path outputPath = new Path(outputDir);
1531      UUID uuid = UUID.randomUUID(); //create a random UUID.
1532      Path iterationOutput = new Path(outputPath, uuid.toString());
1533
1534      Verify verify = new Verify();
1535      verify.setConf(getConf());
1536      int retCode = verify.run(iterationOutput, numReducers);
1537      if (retCode > 0) {
1538        throw new RuntimeException("Verify.run failed with return code: " + retCode);
1539      }
1540
1541      if (!verify.verify(expectedNumNodes)) {
1542        throw new RuntimeException("Verify.verify failed");
1543      }
1544      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
1545    }
1546
1547    @Override
1548    public int run(String[] args) throws Exception {
1549      if (args.length < 5) {
1550        System.err.println(USAGE);
1551        return 1;
1552      }
1553      try {
1554        int numIterations = Integer.parseInt(args[0]);
1555        int numMappers = Integer.parseInt(args[1]);
1556        long numNodes = Long.parseLong(args[2]);
1557        String outputDir = args[3];
1558        int numReducers = Integer.parseInt(args[4]);
1559        Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1560        Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1561        Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]);
1562
1563        long expectedNumNodes = 0;
1564
1565        if (numIterations < 0) {
1566          numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
1567        }
1568        LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1569        for (int i = 0; i < numIterations; i++) {
1570          LOG.info("Starting iteration = " + i);
1571          runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers);
1572          expectedNumNodes += numMappers * numNodes;
1573          runVerify(outputDir, numReducers, expectedNumNodes);
1574        }
1575        return 0;
1576      } catch (NumberFormatException e) {
1577        System.err.println("Parsing loop arguments failed: " + e.getMessage());
1578        System.err.println(USAGE);
1579        return 1;
1580      }
1581    }
1582  }
1583
1584  /**
1585   * A stand alone program that prints out portions of a list created by {@link Generator}
1586   */
1587  private static class Print extends Configured implements Tool {
1588    @Override
1589    public int run(String[] args) throws Exception {
1590      Options options = new Options();
1591      options.addOption("s", "start", true, "start key");
1592      options.addOption("e", "end", true, "end key");
1593      options.addOption("l", "limit", true, "number to print");
1594
1595      GnuParser parser = new GnuParser();
1596      CommandLine cmd = null;
1597      try {
1598        cmd = parser.parse(options, args);
1599        if (cmd.getArgs().length != 0) {
1600          throw new ParseException("Command takes no arguments");
1601        }
1602      } catch (ParseException e) {
1603        System.err.println("Failed to parse command line " + e.getMessage());
1604        System.err.println();
1605        HelpFormatter formatter = new HelpFormatter();
1606        formatter.printHelp(getClass().getSimpleName(), options);
1607        System.exit(-1);
1608      }
1609
1610      Connection connection = ConnectionFactory.createConnection(getConf());
1611      Table table = connection.getTable(getTableName(getConf()));
1612
1613      Scan scan = new Scan();
1614      scan.setBatch(10000);
1615
1616      if (cmd.hasOption("s"))
1617        scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1618
1619      if (cmd.hasOption("e"))
1620        scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1621
1622      int limit = 0;
1623      if (cmd.hasOption("l"))
1624        limit = Integer.parseInt(cmd.getOptionValue("l"));
1625      else
1626        limit = 100;
1627
1628      ResultScanner scanner = table.getScanner(scan);
1629
1630      CINode node = new CINode();
1631      Result result = scanner.next();
1632      int count = 0;
1633      while (result != null && count++ < limit) {
1634        node = getCINode(result, node);
1635        System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1636            Bytes.toStringBinary(node.prev), node.count, node.client);
1637        result = scanner.next();
1638      }
1639      scanner.close();
1640      table.close();
1641      connection.close();
1642
1643      return 0;
1644    }
1645  }
1646
1647  /**
1648   * A stand alone program that deletes a single node.
1649   */
1650  private static class Delete extends Configured implements Tool {
1651    @Override
1652    public int run(String[] args) throws Exception {
1653      if (args.length != 1) {
1654        System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1655        return 0;
1656      }
1657      byte[] val = Bytes.toBytesBinary(args[0]);
1658
1659      org.apache.hadoop.hbase.client.Delete delete
1660        = new org.apache.hadoop.hbase.client.Delete(val);
1661
1662      try (Connection connection = ConnectionFactory.createConnection(getConf());
1663          Table table = connection.getTable(getTableName(getConf()))) {
1664        table.delete(delete);
1665      }
1666
1667      System.out.println("Delete successful");
1668      return 0;
1669    }
1670  }
1671
1672  abstract static class WalkerBase extends Configured{
1673    protected static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1674      Scan scan = new Scan();
1675      scan.setStartRow(startKey);
1676      scan.setBatch(1);
1677      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1678
1679      long t1 = System.currentTimeMillis();
1680      ResultScanner scanner = table.getScanner(scan);
1681      Result result = scanner.next();
1682      long t2 = System.currentTimeMillis();
1683      scanner.close();
1684
1685      if ( result != null) {
1686        CINode node = getCINode(result, new CINode());
1687        System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1688        return node;
1689      }
1690
1691      System.out.println("FSR " + (t2 - t1));
1692
1693      return null;
1694    }
1695    protected CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1696      Get get = new Get(row);
1697      get.addColumn(FAMILY_NAME, COLUMN_PREV);
1698      Result result = table.get(get);
1699      return getCINode(result, node);
1700    }
1701  }
1702  /**
1703   * A stand alone program that follows a linked list created by {@link Generator} and prints
1704   * timing info.
1705   */
1706  private static class Walker extends WalkerBase implements Tool {
1707
1708    public Walker(){}
1709
1710    @Override
1711    public int run(String[] args) throws IOException {
1712
1713      Options options = new Options();
1714      options.addOption("n", "num", true, "number of queries");
1715      options.addOption("s", "start", true, "key to start at, binary string");
1716      options.addOption("l", "logevery", true, "log every N queries");
1717
1718      GnuParser parser = new GnuParser();
1719      CommandLine cmd = null;
1720      try {
1721        cmd = parser.parse(options, args);
1722        if (cmd.getArgs().length != 0) {
1723          throw new ParseException("Command takes no arguments");
1724        }
1725      } catch (ParseException e) {
1726        System.err.println("Failed to parse command line " + e.getMessage());
1727        System.err.println();
1728        HelpFormatter formatter = new HelpFormatter();
1729        formatter.printHelp(getClass().getSimpleName(), options);
1730        System.exit(-1);
1731      }
1732
1733      long maxQueries = Long.MAX_VALUE;
1734      if (cmd.hasOption('n')) {
1735        maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1736      }
1737      Random rand = new SecureRandom();
1738      boolean isSpecificStart = cmd.hasOption('s');
1739
1740      byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1741      int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1742
1743      Connection connection = ConnectionFactory.createConnection(getConf());
1744      Table table = connection.getTable(getTableName(getConf()));
1745      long numQueries = 0;
1746      // If isSpecificStart is set, only walk one list from that particular node.
1747      // Note that in case of circular (or P-shaped) list it will walk forever, as is
1748      // the case in normal run without startKey.
1749      while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1750        if (!isSpecificStart) {
1751          startKey = new byte[ROWKEY_LENGTH];
1752          rand.nextBytes(startKey);
1753        }
1754        CINode node = findStartNode(table, startKey);
1755        if (node == null && isSpecificStart) {
1756          System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1757        }
1758        numQueries++;
1759        while (node != null && node.prev.length != NO_KEY.length &&
1760            numQueries < maxQueries) {
1761          byte[] prev = node.prev;
1762          long t1 = System.currentTimeMillis();
1763          node = getNode(prev, table, node);
1764          long t2 = System.currentTimeMillis();
1765          if (logEvery > 0 && numQueries % logEvery == 0) {
1766            System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1767          }
1768          numQueries++;
1769          if (node == null) {
1770            System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1771          } else if (node.prev.length == NO_KEY.length) {
1772            System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1773          }
1774        }
1775      }
1776      table.close();
1777      connection.close();
1778      return 0;
1779    }
1780  }
1781
1782  private static class Clean extends Configured implements Tool {
1783    @Override public int run(String[] args) throws Exception {
1784      if (args.length < 1) {
1785        System.err.println("Usage: Clean <output dir>");
1786        return -1;
1787      }
1788
1789      Path p = new Path(args[0]);
1790      Configuration conf = getConf();
1791      TableName tableName = getTableName(conf);
1792      try (FileSystem fs = HFileSystem.get(conf);
1793          Connection conn = ConnectionFactory.createConnection(conf);
1794          Admin admin = conn.getAdmin()) {
1795        if (admin.tableExists(tableName)) {
1796          admin.disableTable(tableName);
1797          admin.deleteTable(tableName);
1798        }
1799
1800        if (fs.exists(p)) {
1801          fs.delete(p, true);
1802        }
1803      }
1804
1805      return 0;
1806    }
1807  }
1808
1809  static TableName getTableName(Configuration conf) {
1810    return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1811  }
1812
1813  private static CINode getCINode(Result result, CINode node) {
1814    node.key = Bytes.copy(result.getRow());
1815    if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1816      node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1817    } else {
1818      node.prev = NO_KEY;
1819    }
1820    if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1821      node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1822    } else {
1823      node.count = -1;
1824    }
1825    if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1826      node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1827    } else {
1828      node.client = "";
1829    }
1830    return node;
1831  }
1832
1833  protected IntegrationTestingUtility util;
1834
1835  @Override
1836  public void setUpCluster() throws Exception {
1837    util = getTestingUtil(getConf());
1838    boolean isDistributed = util.isDistributedCluster();
1839    util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1840    if (!isDistributed) {
1841      util.startMiniMapReduceCluster();
1842    }
1843    this.setConf(util.getConfiguration());
1844  }
1845
1846  @Override
1847  public void cleanUpCluster() throws Exception {
1848    super.cleanUpCluster();
1849    if (util.isDistributedCluster()) {
1850      util.shutdownMiniMapReduceCluster();
1851    }
1852  }
1853
1854  private static boolean isMultiUnevenColumnFamilies(Configuration conf) {
1855    return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,true);
1856  }
1857
1858  @Test
1859  public void testContinuousIngest() throws IOException, Exception {
1860    //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1861    Configuration conf = getTestingUtil(getConf()).getConfiguration();
1862    if (isMultiUnevenColumnFamilies(getConf())) {
1863      // make sure per CF flush is on
1864      conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
1865    }
1866    int ret =
1867        ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1868            util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1869    org.junit.Assert.assertEquals(0, ret);
1870  }
1871
1872  private void usage() {
1873    System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1874    printCommands();
1875  }
1876
1877  private void printCommands() {
1878    System.err.println("Commands:");
1879    System.err.println(" generator  Map only job that generates data.");
1880    System.err.println(" verify     A map reduce job that looks for holes. Check return code and");
1881    System.err.println("            look at the counts after running. See REFERENCED and");
1882    System.err.println("            UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1883    System.err.println("            with the Generator.");
1884    System.err.println(" walker     " +
1885      "Standalone program that starts following a linked list & emits timing info.");
1886    System.err.println(" print      Standalone program that prints nodes in the linked list.");
1887    System.err.println(" delete     Standalone program that deletes a·single node.");
1888    System.err.println(" loop       Program to Loop through Generator and Verify steps");
1889    System.err.println(" clean      Program to clean all left over detritus.");
1890    System.err.println(" search     Search for missing keys.");
1891    System.err.println("");
1892    System.err.println("General options:");
1893    System.err.println(" -D"+ TABLE_NAME_KEY+ "=<tableName>");
1894    System.err.println("    Run using the <tableName> as the tablename.  Defaults to "
1895        + DEFAULT_TABLE_NAME);
1896    System.err.println(" -D"+ HBaseTestingUtility.REGIONS_PER_SERVER_KEY+ "=<# regions>");
1897    System.err.println("    Create table with presplit regions per server.  Defaults to "
1898        + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
1899
1900    System.err.println(" -DuseMob=<true|false>");
1901    System.err.println("    Create table so that the mob read/write path is forced.  " +
1902        "Defaults to false");
1903
1904    System.err.flush();
1905  }
1906
1907  @Override
1908  protected void processOptions(CommandLine cmd) {
1909    super.processOptions(cmd);
1910    String[] args = cmd.getArgs();
1911    //get the class, run with the conf
1912    if (args.length < 1) {
1913      printUsage(this.getClass().getSimpleName() +
1914        " <general options> COMMAND [<COMMAND options>]", "General options:", "");
1915      printCommands();
1916      // Have to throw an exception here to stop the processing. Looks ugly but gets message across.
1917      throw new RuntimeException("Incorrect Number of args.");
1918    }
1919    toRun = args[0];
1920    otherArgs = Arrays.copyOfRange(args, 1, args.length);
1921  }
1922
1923  @Override
1924  public int runTestFromCommandLine() throws Exception {
1925    Tool tool = null;
1926    if (toRun.equalsIgnoreCase("Generator")) {
1927      tool = new Generator();
1928    } else if (toRun.equalsIgnoreCase("Verify")) {
1929      tool = new Verify();
1930    } else if (toRun.equalsIgnoreCase("Loop")) {
1931      Loop loop = new Loop();
1932      loop.it = this;
1933      tool = loop;
1934    } else if (toRun.equalsIgnoreCase("Walker")) {
1935      tool = new Walker();
1936    } else if (toRun.equalsIgnoreCase("Print")) {
1937      tool = new Print();
1938    } else if (toRun.equalsIgnoreCase("Delete")) {
1939      tool = new Delete();
1940    } else if (toRun.equalsIgnoreCase("Clean")) {
1941      tool = new Clean();
1942    } else if (toRun.equalsIgnoreCase("Search")) {
1943      tool = new Search();
1944    } else {
1945      usage();
1946      throw new RuntimeException("Unknown arg");
1947    }
1948
1949    return ToolRunner.run(getConf(), tool, otherArgs);
1950  }
1951
1952  @Override
1953  public TableName getTablename() {
1954    Configuration c = getConf();
1955    return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1956  }
1957
1958  @Override
1959  protected Set<String> getColumnFamilies() {
1960    if (isMultiUnevenColumnFamilies(getConf())) {
1961      return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
1962        Bytes.toString(TINY_FAMILY_NAME));
1963    } else {
1964      return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1965    }
1966  }
1967
1968  private static void setJobConf(Job job, int numMappers, long numNodes,
1969      Integer width, Integer wrapMultiplier, Integer numWalkers) {
1970    job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1971    job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1972    if (width != null) {
1973      job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1974    }
1975    if (wrapMultiplier != null) {
1976      job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1977    }
1978    if (numWalkers != null) {
1979      job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers);
1980    }
1981  }
1982
1983  public static void setJobScannerConf(Job job) {
1984    // Make sure scanners log something useful to make debugging possible.
1985    job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
1986    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1987  }
1988
1989  public static void main(String[] args) throws Exception {
1990    Configuration conf = HBaseConfiguration.create();
1991    IntegrationTestingUtility.setUseDistributedCluster(conf);
1992    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1993    System.exit(ret);
1994  }
1995}