001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.security.SecureRandom;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Random;
031import java.util.Set;
032import java.util.SortedSet;
033import java.util.TreeSet;
034import java.util.UUID;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.atomic.AtomicInteger;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.conf.Configured;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.LocatedFileStatus;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.fs.RemoteIterator;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HBaseTestingUtility;
046import org.apache.hadoop.hbase.HColumnDescriptor;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.HTableDescriptor;
050import org.apache.hadoop.hbase.IntegrationTestBase;
051import org.apache.hadoop.hbase.IntegrationTestingUtility;
052import org.apache.hadoop.hbase.MasterNotRunningException;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.client.BufferedMutator;
056import org.apache.hadoop.hbase.client.BufferedMutatorParams;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionConfiguration;
059import org.apache.hadoop.hbase.client.ConnectionFactory;
060import org.apache.hadoop.hbase.client.Get;
061import org.apache.hadoop.hbase.client.Mutation;
062import org.apache.hadoop.hbase.client.Put;
063import org.apache.hadoop.hbase.client.RegionLocator;
064import org.apache.hadoop.hbase.client.Result;
065import org.apache.hadoop.hbase.client.ResultScanner;
066import org.apache.hadoop.hbase.client.Scan;
067import org.apache.hadoop.hbase.client.ScannerCallable;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.fs.HFileSystem;
070import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
071import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
072import org.apache.hadoop.hbase.mapreduce.TableMapper;
073import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
074import org.apache.hadoop.hbase.mapreduce.WALPlayer;
075import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy;
076import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
077import org.apache.hadoop.hbase.testclassification.IntegrationTests;
078import org.apache.hadoop.hbase.util.AbstractHBaseTool;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.CommonFSUtils;
081import org.apache.hadoop.hbase.util.Random64;
082import org.apache.hadoop.hbase.util.RegionSplitter;
083import org.apache.hadoop.hbase.wal.WALEdit;
084import org.apache.hadoop.hbase.wal.WALKey;
085import org.apache.hadoop.io.BytesWritable;
086import org.apache.hadoop.io.NullWritable;
087import org.apache.hadoop.io.Writable;
088import org.apache.hadoop.mapreduce.Counter;
089import org.apache.hadoop.mapreduce.CounterGroup;
090import org.apache.hadoop.mapreduce.Counters;
091import org.apache.hadoop.mapreduce.InputFormat;
092import org.apache.hadoop.mapreduce.InputSplit;
093import org.apache.hadoop.mapreduce.Job;
094import org.apache.hadoop.mapreduce.JobContext;
095import org.apache.hadoop.mapreduce.Mapper;
096import org.apache.hadoop.mapreduce.RecordReader;
097import org.apache.hadoop.mapreduce.Reducer;
098import org.apache.hadoop.mapreduce.TaskAttemptContext;
099import org.apache.hadoop.mapreduce.TaskAttemptID;
100import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
101import org.apache.hadoop.mapreduce.lib.input.FileSplit;
102import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
103import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
104import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
105import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
106import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
107import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
108import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
109import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
110import org.apache.hadoop.util.Tool;
111import org.apache.hadoop.util.ToolRunner;
112import org.junit.Test;
113import org.junit.experimental.categories.Category;
114import org.slf4j.Logger;
115import org.slf4j.LoggerFactory;
116
117import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
118import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
119import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
120import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
121import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
122import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
123import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
124
125/**
126 * <p>
127 * This is an integration test borrowed from goraci, written by Keith Turner,
128 * which is in turn inspired by the Accumulo test called continous ingest (ci).
129 * The original source code can be found here:
130 * <ul>
131 *   <li>https://github.com/keith-turner/goraci</li>
132 *   <li>https://github.com/enis/goraci/</li>
133 * </ul>
134 * </p>
135 * <p>
136 * Apache Accumulo [0] has a simple test suite that verifies that data is not
137 * lost at scale. This test suite is called continuous ingest. This test runs
138 * many ingest clients that continually create linked lists containing 25
139 * million nodes. At some point the clients are stopped and a map reduce job is
140 * run to ensure no linked list has a hole. A hole indicates data was lost.
141 * </p>
142 * <p>
143 * The nodes in the linked list are random. This causes each linked list to
144 * spread across the table. Therefore if one part of a table loses data, then it
145 * will be detected by references in another part of the table.
146 * </p>
147 * <p>
148 * <h3>THE ANATOMY OF THE TEST</h3>
149 *
150 * Below is rough sketch of how data is written. For specific details look at
151 * the Generator code.
152 * </p>
153 * <p>
154 * <ol>
155 * <li>Write out 1 million nodes</li>
156 * <li>Flush the client</li>
157 * <li>Write out 1 million that reference previous million</li>
158 * <li>If this is the 25th set of 1 million nodes, then update 1st set of
159 * million to point to last</li>
160 * <li>goto 1</li>
161 * </ol>
162 * </p>
163 * <p>
164 * The key is that nodes only reference flushed nodes. Therefore a node should
165 * never reference a missing node, even if the ingest client is killed at any
166 * point in time.
167 * </p>
168 * <p>
169 * When running this test suite w/ Accumulo there is a script running in
170 * parallel called the Aggitator that randomly and continuously kills server
171 * processes. The outcome was that many data loss bugs were found in Accumulo
172 * by doing this. This test suite can also help find bugs that impact uptime
173 * and stability when run for days or weeks.
174 * </p>
175 * <p>
176 * This test suite consists the following
177 * <ul>
178 * <li>a few Java programs</li>
179 * <li>a little helper script to run the java programs</li>
180 * <li>a maven script to build it</li>
181 * </ul>
182 * </p>
183 * <p>
184 * When generating data, its best to have each map task generate a multiple of
185 * 25 million. The reason for this is that circular linked list are generated
186 * every 25M. Not generating a multiple in 25M will result in some nodes in the
187 * linked list not having references. The loss of an unreferenced node can not
188 * be detected.
189 * </p>
190 * <p>
191 * <h3>Below is a description of the Java programs</h3>
192 * <ul>
193 * <li>
194 * {@code Generator} - A map only job that generates data. As stated previously, its best to
195 * generate data in multiples of 25M. An option is also available to allow concurrent walkers to
196 * select and walk random flushed loops during this phase.
197 * </li>
198 * <li>
199 * {@code Verify} - A map reduce job that looks for holes. Look at the counts after running.
200 * {@code REFERENCED} and {@code UNREFERENCED} are ok, any {@code UNDEFINED} counts are bad. Do not
201 * run at the same time as the Generator.
202 * </li>
203 * <li>
204 * {@code Walker} - A standalone program that start following a linked list and emits timing info.
205 * </li>
206 * <li>
207 * {@code Print} - A standalone program that prints nodes in the linked list
208 * </li>
209 * <li>
210 * {@code Delete} - A standalone program that deletes a single node
211 * </li>
212 * </ul>
213 *
214 * This class can be run as a unit test, as an integration test, or from the command line
215 * </p>
216 * <p>
217 * ex:
218 * <pre>
219 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
220 *    loop 2 1 100000 /temp 1 1000 50 1 0
221 * </pre>
222 * </p>
223 */
224@Category(IntegrationTests.class)
225public class IntegrationTestBigLinkedList extends IntegrationTestBase {
226  protected static final byte[] NO_KEY = new byte[1];
227
228  protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
229
230  protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
231
232  protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
233  private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
234  private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
235
236  //link to the id of the prev node in the linked list
237  protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
238
239  //identifier of the mapred task that generated this row
240  protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
241
242  //the id of the row within the same client.
243  protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
244
245  /** How many rows to write per map task. This has to be a multiple of 25M */
246  private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
247    = "IntegrationTestBigLinkedList.generator.num_rows";
248
249  private static final String GENERATOR_NUM_MAPPERS_KEY
250    = "IntegrationTestBigLinkedList.generator.map.tasks";
251
252  private static final String GENERATOR_WIDTH_KEY
253    = "IntegrationTestBigLinkedList.generator.width";
254
255  private static final String GENERATOR_WRAP_KEY
256    = "IntegrationTestBigLinkedList.generator.wrap";
257
258  private static final String CONCURRENT_WALKER_KEY
259    = "IntegrationTestBigLinkedList.generator.concurrentwalkers";
260
261  protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
262
263  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
264
265  private static final int WIDTH_DEFAULT = 1000000;
266  private static final int WRAP_DEFAULT = 25;
267  private static final int ROWKEY_LENGTH = 16;
268
269  private static final int CONCURRENT_WALKER_DEFAULT = 0;
270
271  protected String toRun;
272  protected String[] otherArgs;
273
274  static class CINode {
275    byte[] key;
276    byte[] prev;
277    String client;
278    long count;
279  }
280
281  /**
282   * A Map only job that generates random linked list and stores them.
283   */
284  static class Generator extends Configured implements Tool {
285
286    private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
287
288    /**
289     * Set this configuration if you want to test single-column family flush works. If set, we will
290     * add a big column family and a small column family on either side of the usual ITBLL 'meta'
291     * column family. When we write out the ITBLL, we will also add to the big column family a value
292     * bigger than that for ITBLL and for small, something way smaller. The idea is that when
293     * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
294     * way. Here is how you would pass it:
295     * <p>
296     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
297     * -Dgenerator.multiple.columnfamilies=true generator 1 10 g
298     */
299    public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
300        "generator.multiple.columnfamilies";
301
302    /**
303     * Set this configuration if you want to scale up the size of test data quickly.
304     * <p>
305     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
306     * -Dgenerator.big.family.value.size=1024 generator 1 10 output
307     */
308    public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size";
309
310
311    public static enum Counts {
312      SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION
313    }
314
315    public static final String USAGE =  "Usage : " + Generator.class.getSimpleName() +
316        " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" +
317        " <num walker threads>] \n" +
318        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
319        "walkers will verify random flushed loop during Generation.";
320
321    public Job job;
322
323    static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
324      static class GeneratorInputSplit extends InputSplit implements Writable {
325        @Override
326        public long getLength() throws IOException, InterruptedException {
327          return 1;
328        }
329        @Override
330        public String[] getLocations() throws IOException, InterruptedException {
331          return new String[0];
332        }
333        @Override
334        public void readFields(DataInput arg0) throws IOException {
335        }
336        @Override
337        public void write(DataOutput arg0) throws IOException {
338        }
339      }
340
341      static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
342        private long count;
343        private long numNodes;
344        private Random64 rand;
345
346        @Override
347        public void close() throws IOException {
348        }
349
350        @Override
351        public BytesWritable getCurrentKey() throws IOException, InterruptedException {
352          byte[] bytes = new byte[ROWKEY_LENGTH];
353          rand.nextBytes(bytes);
354          return new BytesWritable(bytes);
355        }
356
357        @Override
358        public NullWritable getCurrentValue() throws IOException, InterruptedException {
359          return NullWritable.get();
360        }
361
362        @Override
363        public float getProgress() throws IOException, InterruptedException {
364          return (float)(count / (double)numNodes);
365        }
366
367        @Override
368        public void initialize(InputSplit arg0, TaskAttemptContext context)
369            throws IOException, InterruptedException {
370          numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
371          // Use Random64 to avoid issue described in HBASE-21256.
372          rand = new Random64();
373        }
374
375        @Override
376        public boolean nextKeyValue() throws IOException, InterruptedException {
377          return count++ < numNodes;
378        }
379
380      }
381
382      @Override
383      public RecordReader<BytesWritable,NullWritable> createRecordReader(
384          InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
385        GeneratorRecordReader rr = new GeneratorRecordReader();
386        rr.initialize(split, context);
387        return rr;
388      }
389
390      @Override
391      public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
392        int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
393
394        ArrayList<InputSplit> splits = new ArrayList<>(numMappers);
395
396        for (int i = 0; i < numMappers; i++) {
397          splits.add(new GeneratorInputSplit());
398        }
399
400        return splits;
401      }
402    }
403
404    /** Ensure output files from prev-job go to map inputs for current job */
405    static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
406      @Override
407      protected boolean isSplitable(JobContext context, Path filename) {
408        return false;
409      }
410    }
411
412    /**
413     * Some ASCII art time:
414     * <p>
415     * [ . . . ] represents one batch of random longs of length WIDTH
416     * <pre>
417     *                _________________________
418     *               |                  ______ |
419     *               |                 |      ||
420     *             .-+-----------------+-----.||
421     *             | |                 |     |||
422     * first   = [ . . . . . . . . . . . ]   |||
423     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
424     *             | | | | | | | | | | |     |||
425     * prev    = [ . . . . . . . . . . . ]   |||
426     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
427     *             | | | | | | | | | | |     |||
428     * current = [ . . . . . . . . . . . ]   |||
429     *                                       |||
430     * ...                                   |||
431     *                                       |||
432     * last    = [ . . . . . . . . . . . ]   |||
433     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
434     *             |                 |________||
435     *             |___________________________|
436     * </pre>
437     */
438
439    static class GeneratorMapper
440      extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
441
442      byte[][] first = null;
443      byte[][] prev = null;
444      byte[][] current = null;
445      byte[] id;
446      long count = 0;
447      int i;
448      BufferedMutator mutator;
449      Connection connection;
450      long numNodes;
451      long wrap;
452      int width;
453      boolean multipleUnevenColumnFamilies;
454      byte[] tinyValue = new byte[] { 't' };
455      byte[] bigValue = null;
456      Configuration conf;
457
458      volatile boolean walkersStop;
459      int numWalkers;
460      volatile List<Long> flushedLoops = new ArrayList<>();
461      List<Thread> walkers = new ArrayList<>();
462
463      @Override
464      protected void setup(Context context) throws IOException, InterruptedException {
465        id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
466        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
467        instantiateHTable();
468        this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
469        current = new byte[this.width][];
470        int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
471        this.wrap = (long)wrapMultiplier * width;
472        this.numNodes = context.getConfiguration().getLong(
473            GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
474        if (this.numNodes < this.wrap) {
475          this.wrap = this.numNodes;
476        }
477        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
478        this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
479        this.walkersStop = false;
480        this.conf = context.getConfiguration();
481
482        if (multipleUnevenColumnFamilies) {
483          int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256);
484          int limit = context.getConfiguration().getInt(
485            ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
486            ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT);
487
488          Preconditions.checkArgument(
489            n <= limit,
490            "%s(%s) > %s(%s)",
491            BIG_FAMILY_VALUE_SIZE_KEY, n, ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit);
492
493          bigValue = new byte[n];
494          ThreadLocalRandom.current().nextBytes(bigValue);
495          LOG.info("Create a bigValue with " + n + " bytes.");
496        }
497
498        Preconditions.checkArgument(
499          numNodes > 0,
500          "numNodes(%s) <= 0",
501          numNodes);
502        Preconditions.checkArgument(
503          numNodes % width == 0,
504          "numNodes(%s) mod width(%s) != 0",
505          numNodes, width);
506        Preconditions.checkArgument(
507          numNodes % wrap == 0,
508          "numNodes(%s) mod wrap(%s) != 0",
509          numNodes, wrap
510        );
511      }
512
513      protected void instantiateHTable() throws IOException {
514        mutator = connection.getBufferedMutator(
515            new BufferedMutatorParams(getTableName(connection.getConfiguration()))
516                .writeBufferSize(4 * 1024 * 1024));
517      }
518
519      @Override
520      protected void cleanup(Context context) throws IOException ,InterruptedException {
521        joinWalkers();
522        mutator.close();
523        connection.close();
524      }
525
526      @Override
527      protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
528        current[i] = new byte[key.getLength()];
529        System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
530        if (++i == current.length) {
531          LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=",
532            current.length, count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i);
533          persist(output, count, prev, current, id);
534          i = 0;
535
536          if (first == null) {
537            first = current;
538          }
539          prev = current;
540          current = new byte[this.width][];
541
542          count += current.length;
543          output.setStatus("Count " + count);
544
545          if (count % wrap == 0) {
546            // this block of code turns the 1 million linked list of length 25 into one giant
547            //circular linked list of 25 million
548            circularLeftShift(first);
549            persist(output, -1, prev, first, null);
550            // At this point the entire loop has been flushed so we can add one of its nodes to the
551            // concurrent walker
552            if (numWalkers > 0) {
553              addFlushed(key.getBytes());
554              if (walkers.isEmpty()) {
555                startWalkers(numWalkers, conf, output);
556              }
557            }
558            first = null;
559            prev = null;
560          }
561        }
562      }
563
564      private static <T> void circularLeftShift(T[] first) {
565        T ez = first[0];
566        System.arraycopy(first, 1, first, 0, first.length - 1);
567        first[first.length - 1] = ez;
568      }
569
570      private void addFlushed(byte[] rowKey) {
571        synchronized (flushedLoops) {
572          flushedLoops.add(Bytes.toLong(rowKey));
573          flushedLoops.notifyAll();
574        }
575      }
576
577      protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
578          throws IOException {
579        for (int i = 0; i < current.length; i++) {
580
581          if (i % 100 == 0) {
582            // Tickle progress every so often else maprunner will think us hung
583            output.progress();
584          }
585
586          Put put = new Put(current[i]);
587          put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
588
589          if (count >= 0) {
590            put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
591          }
592          if (id != null) {
593            put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
594          }
595          // See if we are to write multiple columns.
596          if (this.multipleUnevenColumnFamilies) {
597            // Use any column name.
598            put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
599            // Use any column name.
600            put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
601          }
602          mutator.mutate(put);
603        }
604
605        mutator.flush();
606      }
607
608      private void startWalkers(int numWalkers, Configuration conf, Context context) {
609        LOG.info("Starting " + numWalkers + " concurrent walkers");
610        for (int i = 0; i < numWalkers; i++) {
611          Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context));
612          walker.start();
613          walkers.add(walker);
614        }
615      }
616
617      private void joinWalkers() {
618        walkersStop = true;
619        synchronized (flushedLoops) {
620          flushedLoops.notifyAll();
621        }
622        for (Thread walker : walkers) {
623          try {
624            walker.join();
625          } catch (InterruptedException e) {
626            // no-op
627          }
628        }
629      }
630
631      /**
632       * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by
633       * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are
634       * configured to only log erroneous nodes.
635       */
636
637      public class ContinuousConcurrentWalker implements Runnable {
638
639        ConcurrentWalker walker;
640        Configuration conf;
641        Context context;
642        Random rand;
643
644        public ContinuousConcurrentWalker(Configuration conf, Context context) {
645          this.conf = conf;
646          this.context = context;
647          rand = new Random();
648        }
649
650        @Override
651        public void run() {
652          while (!walkersStop) {
653            try {
654              long node = selectLoop();
655              try {
656                walkLoop(node);
657              } catch (IOException e) {
658                context.getCounter(Counts.IOEXCEPTION).increment(1l);
659                return;
660              }
661            } catch (InterruptedException e) {
662              return;
663            }
664          }
665        }
666
667        private void walkLoop(long node) throws IOException {
668          walker = new ConcurrentWalker(context);
669          walker.setConf(conf);
670          walker.run(node, wrap);
671        }
672
673        private long selectLoop () throws InterruptedException{
674          synchronized (flushedLoops) {
675            while (flushedLoops.isEmpty() && !walkersStop) {
676              flushedLoops.wait();
677            }
678            if (walkersStop) {
679              throw new InterruptedException();
680            }
681            return flushedLoops.get(rand.nextInt(flushedLoops.size()));
682          }
683        }
684      }
685
686      public static class ConcurrentWalker extends WalkerBase {
687
688        Context context;
689
690        public ConcurrentWalker(Context context) {this.context = context;}
691
692        public void run(long startKeyIn, long maxQueriesIn) throws IOException {
693
694          long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
695          byte[] startKey = Bytes.toBytes(startKeyIn);
696
697          Connection connection = ConnectionFactory.createConnection(getConf());
698          Table table = connection.getTable(getTableName(getConf()));
699          long numQueries = 0;
700          // If isSpecificStart is set, only walk one list from that particular node.
701          // Note that in case of circular (or P-shaped) list it will walk forever, as is
702          // the case in normal run without startKey.
703
704          CINode node = findStartNode(table, startKey);
705          if (node == null) {
706            LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
707            throw new IOException("Start node not found: " + startKeyIn);
708          }
709          while (numQueries < maxQueries) {
710            numQueries++;
711            byte[] prev = node.prev;
712            long t1 = System.currentTimeMillis();
713            node = getNode(prev, table, node);
714            long t2 = System.currentTimeMillis();
715            if (node == null) {
716              LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
717              context.getCounter(Counts.UNDEFINED).increment(1l);
718            } else if (node.prev.length == NO_KEY.length) {
719              LOG.error("ConcurrentWalker found TERMINATING NODE: " +
720                  Bytes.toStringBinary(node.key));
721              context.getCounter(Counts.TERMINATING).increment(1l);
722            } else {
723              // Increment for successful walk
724              context.getCounter(Counts.SUCCESS).increment(1l);
725            }
726          }
727          table.close();
728          connection.close();
729        }
730      }
731    }
732
733    @Override
734    public int run(String[] args) throws Exception {
735      if (args.length < 3) {
736        System.err.println(USAGE);
737        return 1;
738      }
739      try {
740        int numMappers = Integer.parseInt(args[0]);
741        long numNodes = Long.parseLong(args[1]);
742        Path tmpOutput = new Path(args[2]);
743        Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
744        Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
745        Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]);
746        return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
747      } catch (NumberFormatException e) {
748        System.err.println("Parsing generator arguments failed: " + e.getMessage());
749        System.err.println(USAGE);
750        return 1;
751      }
752    }
753
754    protected void createSchema() throws IOException {
755      Configuration conf = getConf();
756      TableName tableName = getTableName(conf);
757      try (Connection conn = ConnectionFactory.createConnection(conf);
758          Admin admin = conn.getAdmin()) {
759        if (!admin.tableExists(tableName)) {
760          HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
761          htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
762          // Always add these families. Just skip writing to them when we do not test per CF flush.
763          htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME));
764          htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME));
765          // if -DuseMob=true force all data through mob path.
766          if (conf.getBoolean("useMob", false)) {
767            for (HColumnDescriptor hcd : htd.getColumnFamilies() ) {
768              hcd.setMobEnabled(true);
769              hcd.setMobThreshold(4);
770            }
771          }
772
773          // If we want to pre-split compute how many splits.
774          if (conf.getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY,
775              HBaseTestingUtility.PRESPLIT_TEST_TABLE)) {
776            int numberOfServers = admin.getRegionServers().size();
777            if (numberOfServers == 0) {
778              throw new IllegalStateException("No live regionservers");
779            }
780            int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
781                HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
782            int totalNumberOfRegions = numberOfServers * regionsPerServer;
783            LOG.info("Number of live regionservers: " + numberOfServers + ", " +
784                "pre-splitting table into " + totalNumberOfRegions + " regions " +
785                "(default regions per server: " + regionsPerServer + ")");
786
787
788            byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
789
790            admin.createTable(htd, splits);
791          } else {
792            // Looks like we're just letting things play out.
793            // Create a table with on region by default.
794            // This will make the splitting work hard.
795            admin.createTable(htd);
796          }
797        }
798      } catch (MasterNotRunningException e) {
799        LOG.error("Master not running", e);
800        throw new IOException(e);
801      }
802    }
803
804    public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
805        Integer width, Integer wrapMultiplier, Integer numWalkers)
806        throws Exception {
807      LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
808          + ", numNodes=" + numNodes);
809      Job job = Job.getInstance(getConf());
810
811      job.setJobName("Random Input Generator");
812      job.setNumReduceTasks(0);
813      job.setJarByClass(getClass());
814
815      job.setInputFormatClass(GeneratorInputFormat.class);
816      job.setOutputKeyClass(BytesWritable.class);
817      job.setOutputValueClass(NullWritable.class);
818
819      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
820
821      job.setMapperClass(Mapper.class); //identity mapper
822
823      FileOutputFormat.setOutputPath(job, tmpOutput);
824      job.setOutputFormatClass(SequenceFileOutputFormat.class);
825      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);
826
827      boolean success = jobCompletion(job);
828
829      return success ? 0 : 1;
830    }
831
832    public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
833        Integer width, Integer wrapMultiplier, Integer numWalkers)
834        throws Exception {
835      LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
836      createSchema();
837      job = Job.getInstance(getConf());
838
839      job.setJobName("Link Generator");
840      job.setNumReduceTasks(0);
841      job.setJarByClass(getClass());
842
843      FileInputFormat.setInputPaths(job, tmpOutput);
844      job.setInputFormatClass(OneFilePerMapperSFIF.class);
845      job.setOutputKeyClass(NullWritable.class);
846      job.setOutputValueClass(NullWritable.class);
847
848      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
849
850      setMapperForGenerator(job);
851
852      job.setOutputFormatClass(NullOutputFormat.class);
853
854      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
855      TableMapReduceUtil.addDependencyJars(job);
856      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
857                                                     AbstractHBaseTool.class);
858      TableMapReduceUtil.initCredentials(job);
859
860      boolean success = jobCompletion(job);
861
862      return success ? 0 : 1;
863    }
864
865    protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
866        ClassNotFoundException {
867      boolean success = job.waitForCompletion(true);
868      return success;
869    }
870
871    protected void setMapperForGenerator(Job job) {
872      job.setMapperClass(GeneratorMapper.class);
873    }
874
875    public int run(int numMappers, long numNodes, Path tmpOutput,
876        Integer width, Integer wrapMultiplier, Integer numWalkers)
877        throws Exception {
878      int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier,
879          numWalkers);
880      if (ret > 0) {
881        return ret;
882      }
883      return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
884    }
885
886    public boolean verify() {
887      try {
888        Counters counters = job.getCounters();
889        if (counters == null) {
890          LOG.info("Counters object was null, Generator verification cannot be performed."
891              + " This is commonly a result of insufficient YARN configuration.");
892          return false;
893        }
894
895        if (counters.findCounter(Counts.TERMINATING).getValue() > 0 ||
896            counters.findCounter(Counts.UNDEFINED).getValue() > 0 ||
897            counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) {
898          LOG.error("Concurrent walker failed to verify during Generation phase");
899          LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue());
900          LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue());
901          LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue());
902          return false;
903        }
904      } catch (IOException e) {
905        LOG.info("Generator verification could not find counter");
906        return false;
907      }
908      return true;
909    }
910  }
911
912  /**
913   * Tool to search missing rows in WALs and hfiles.
914   * Pass in file or dir of keys to search for. Key file must have been written by Verify step
915   * (we depend on the format it writes out. We'll read them in and then search in hbase
916   * WALs and oldWALs dirs (Some of this is TODO).
917   */
918  static class Search extends Configured implements Tool {
919    private static final Logger LOG = LoggerFactory.getLogger(Search.class);
920    protected Job job;
921
922    private static void printUsage(final String error) {
923      if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
924      System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
925    }
926
927    @Override
928    public int run(String[] args) throws Exception {
929      if (args.length < 1 || args.length > 2) {
930        printUsage(null);
931        return 1;
932      }
933      Path inputDir = new Path(args[0]);
934      int numMappers = 1;
935      if (args.length > 1) {
936        numMappers = Integer.parseInt(args[1]);
937      }
938      return run(inputDir, numMappers);
939    }
940
941    /**
942     * WALPlayer override that searches for keys loaded in the setup.
943     */
944    public static class WALSearcher extends WALPlayer {
945      public WALSearcher(Configuration conf) {
946        super(conf);
947      }
948
949      /**
950       * The actual searcher mapper.
951       */
952      public static class WALMapperSearcher extends WALMapper {
953        private SortedSet<byte []> keysToFind;
954        private AtomicInteger rows = new AtomicInteger(0);
955
956        @Override
957        public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
958            throws IOException {
959          super.setup(context);
960          try {
961            this.keysToFind = readKeysToSearch(context.getConfiguration());
962            LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
963          } catch (InterruptedException e) {
964            throw new InterruptedIOException(e.toString());
965          }
966        }
967
968        @Override
969        protected boolean filter(Context context, Cell cell) {
970          // TODO: Can I do a better compare than this copying out key?
971          byte [] row = new byte [cell.getRowLength()];
972          System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
973          boolean b = this.keysToFind.contains(row);
974          if (b) {
975            String keyStr = Bytes.toStringBinary(row);
976            try {
977              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
978            } catch (IOException|InterruptedException e) {
979              LOG.warn(e.toString(), e);
980            }
981            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
982              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
983            }
984            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
985          }
986          return b;
987        }
988      }
989
990      // Put in place the above WALMapperSearcher.
991      @Override
992      public Job createSubmittableJob(String[] args) throws IOException {
993        Job job = super.createSubmittableJob(args);
994        // Call my class instead.
995        job.setJarByClass(WALMapperSearcher.class);
996        job.setMapperClass(WALMapperSearcher.class);
997        job.setOutputFormatClass(NullOutputFormat.class);
998        return job;
999      }
1000    }
1001
1002    static final String FOUND_GROUP_KEY = "Found";
1003    static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
1004
1005    public int run(Path inputDir, int numMappers) throws Exception {
1006      getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
1007      SortedSet<byte []> keys = readKeysToSearch(getConf());
1008      if (keys.isEmpty()) throw new RuntimeException("No keys to find");
1009      LOG.info("Count of keys to find: " + keys.size());
1010      for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
1011      // Now read all WALs. In two dirs. Presumes certain layout.
1012      Path walsDir = new Path(
1013          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
1014      Path oldWalsDir = new Path(
1015          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
1016      LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
1017        " against " + getConf().get(HConstants.HBASE_DIR));
1018      int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
1019          new String [] {walsDir.toString(), ""});
1020      if (ret != 0) {
1021        return ret;
1022      }
1023      return ToolRunner.run(getConf(), new WALSearcher(getConf()),
1024          new String [] {oldWalsDir.toString(), ""});
1025    }
1026
1027    static SortedSet<byte []> readKeysToSearch(final Configuration conf)
1028    throws IOException, InterruptedException {
1029      Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
1030      FileSystem fs = FileSystem.get(conf);
1031      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1032      if (!fs.exists(keysInputDir)) {
1033        throw new FileNotFoundException(keysInputDir.toString());
1034      }
1035      if (!fs.isDirectory(keysInputDir)) {
1036        throw new UnsupportedOperationException("TODO");
1037      } else {
1038        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
1039        while(iterator.hasNext()) {
1040          LocatedFileStatus keyFileStatus = iterator.next();
1041          // Skip "_SUCCESS" file.
1042          if (keyFileStatus.getPath().getName().startsWith("_")) continue;
1043          result.addAll(readFileToSearch(conf, fs, keyFileStatus));
1044        }
1045      }
1046      return result;
1047    }
1048
1049    private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
1050        final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
1051        InterruptedException {
1052      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1053      // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
1054      // what is missing.
1055      TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
1056      try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
1057          new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
1058        InputSplit is =
1059          new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
1060        rr.initialize(is, context);
1061        while (rr.nextKeyValue()) {
1062          rr.getCurrentKey();
1063          BytesWritable bw = rr.getCurrentValue();
1064          if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
1065            byte[] key = new byte[rr.getCurrentKey().getLength()];
1066            System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
1067                .getLength());
1068            result.add(key);
1069          }
1070        }
1071      }
1072      return result;
1073    }
1074  }
1075
1076  /**
1077   * A Map Reduce job that verifies that the linked lists generated by
1078   * {@link Generator} do not have any holes.
1079   */
1080  static class Verify extends Configured implements Tool {
1081
1082    private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
1083    protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
1084    protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
1085
1086    protected Job job;
1087
1088    public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
1089      private BytesWritable row = new BytesWritable();
1090      private BytesWritable ref = new BytesWritable();
1091
1092      private boolean multipleUnevenColumnFamilies;
1093
1094      @Override
1095      protected void setup(
1096          Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
1097          throws IOException, InterruptedException {
1098        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
1099      }
1100
1101      @Override
1102      protected void map(ImmutableBytesWritable key, Result value, Context context)
1103          throws IOException ,InterruptedException {
1104        byte[] rowKey = key.get();
1105        row.set(rowKey, 0, rowKey.length);
1106        if (multipleUnevenColumnFamilies
1107            && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn(
1108              TINY_FAMILY_NAME, TINY_FAMILY_NAME))) {
1109          context.write(row, DEF_LOST_FAMILIES);
1110        } else {
1111          context.write(row, DEF);
1112        }
1113        byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
1114        if (prev != null && prev.length > 0) {
1115          ref.set(prev, 0, prev.length);
1116          context.write(ref, row);
1117        } else {
1118          LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
1119        }
1120      }
1121    }
1122
1123    /**
1124     * Don't change the order of these enums. Their ordinals are used as type flag when we emit
1125     * problems found from the reducer.
1126     */
1127    public static enum Counts {
1128      UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES,
1129      LOST_FAMILIES
1130    }
1131
1132    /**
1133     * Per reducer, we output problem rows as byte arrasy so can be used as input for
1134     * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag
1135     * saying what sort of emission it is. Flag is the Count enum ordinal as a short.
1136     */
1137    public static class VerifyReducer extends
1138        Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
1139      private ArrayList<byte[]> refs = new ArrayList<>();
1140      private final BytesWritable UNREF = new BytesWritable(addPrefixFlag(
1141        Counts.UNREFERENCED.ordinal(), new byte[] {}));
1142      private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag(
1143        Counts.LOST_FAMILIES.ordinal(), new byte[] {}));
1144
1145      private AtomicInteger rows = new AtomicInteger(0);
1146      private Connection connection;
1147
1148      @Override
1149      protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1150      throws IOException, InterruptedException {
1151        super.setup(context);
1152        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
1153      }
1154
1155      @Override
1156      protected void cleanup(
1157          Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1158          throws IOException, InterruptedException {
1159        if (this.connection != null) {
1160          this.connection.close();
1161        }
1162        super.cleanup(context);
1163      }
1164
1165      /**
1166       * @param ordinal
1167       * @param r
1168       * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up
1169       * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
1170       */
1171      public static byte[] addPrefixFlag(final int ordinal, final byte [] r) {
1172        byte[] prefix = Bytes.toBytes((short)ordinal);
1173        if (prefix.length != Bytes.SIZEOF_SHORT) {
1174          throw new RuntimeException("Unexpected size: " + prefix.length);
1175        }
1176        byte[] result = new byte[prefix.length + r.length];
1177        System.arraycopy(prefix, 0, result, 0, prefix.length);
1178        System.arraycopy(r, 0, result, prefix.length, r.length);
1179        return result;
1180      }
1181
1182      /**
1183       * @param bs
1184       * @return Type from the Counts enum of this row. Reads prefix added by
1185       * {@link #addPrefixFlag(int, byte[])}
1186       */
1187      public static Counts whichType(final byte [] bs) {
1188        int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
1189        return Counts.values()[ordinal];
1190      }
1191
1192      /**
1193       * @param bw
1194       * @return Row bytes minus the type flag.
1195       */
1196      public static byte[] getRowOnly(BytesWritable bw) {
1197        byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
1198        System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
1199        return bytes;
1200      }
1201
1202      @Override
1203      public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
1204          throws IOException, InterruptedException {
1205        int defCount = 0;
1206        boolean lostFamilies = false;
1207        refs.clear();
1208        for (BytesWritable type : values) {
1209          if (type.getLength() == DEF.getLength()) {
1210            defCount++;
1211            if (type.getBytes()[0] == 1) {
1212              lostFamilies = true;
1213            }
1214          } else {
1215            byte[] bytes = new byte[type.getLength()];
1216            System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
1217            refs.add(bytes);
1218          }
1219        }
1220
1221        // TODO check for more than one def, should not happen
1222        StringBuilder refsSb = null;
1223        if (defCount == 0 || refs.size() != 1) {
1224          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1225          refsSb = dumpExtraInfoOnRefs(key, context, refs);
1226          LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
1227            (refsSb != null? refsSb.toString(): ""));
1228        }
1229        if (lostFamilies) {
1230          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1231          LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
1232          context.getCounter(Counts.LOST_FAMILIES).increment(1);
1233          context.write(key, LOSTFAM);
1234        }
1235
1236        if (defCount == 0 && refs.size() > 0) {
1237          // This is bad, found a node that is referenced but not defined. It must have been
1238          // lost, emit some info about this node for debugging purposes.
1239          // Write out a line per reference. If more than one, flag it.;
1240          for (int i = 0; i < refs.size(); i++) {
1241            byte[] bs = refs.get(i);
1242            int ordinal;
1243            if (i <= 0) {
1244              ordinal = Counts.UNDEFINED.ordinal();
1245              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1246              context.getCounter(Counts.UNDEFINED).increment(1);
1247            } else {
1248              ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
1249              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1250            }
1251          }
1252          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1253            // Print out missing row; doing get on reference gives info on when the referencer
1254            // was added which can help a little debugging. This info is only available in mapper
1255            // output -- the 'Linked List error Key...' log message above. What we emit here is
1256            // useless for debugging.
1257            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1258            context.getCounter("undef", keyString).increment(1);
1259          }
1260        } else if (defCount > 0 && refs.isEmpty()) {
1261          // node is defined but not referenced
1262          context.write(key, UNREF);
1263          context.getCounter(Counts.UNREFERENCED).increment(1);
1264          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1265            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1266            context.getCounter("unref", keyString).increment(1);
1267          }
1268        } else {
1269          if (refs.size() > 1) {
1270            // Skip first reference.
1271            for (int i = 1; i < refs.size(); i++) {
1272              context.write(key,
1273                new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
1274            }
1275            context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
1276          }
1277          // node is defined and referenced
1278          context.getCounter(Counts.REFERENCED).increment(1);
1279        }
1280      }
1281
1282      /**
1283       * Dump out extra info around references if there are any. Helps debugging.
1284       * @return StringBuilder filled with references if any.
1285       * @throws IOException
1286       */
1287      private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1288          final List<byte []> refs)
1289      throws IOException {
1290        StringBuilder refsSb = null;
1291        if (refs.isEmpty()) return refsSb;
1292        refsSb = new StringBuilder();
1293        String comma = "";
1294        // If a row is a reference but has no define, print the content of the row that has
1295        // this row as a 'prev'; it will help debug.  The missing row was written just before
1296        // the row we are dumping out here.
1297        TableName tn = getTableName(context.getConfiguration());
1298        try (Table t = this.connection.getTable(tn)) {
1299          for (byte [] ref : refs) {
1300            Result r = t.get(new Get(ref));
1301            List<Cell> cells = r.listCells();
1302            String ts = (cells != null && !cells.isEmpty())?
1303                new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
1304            byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1305            String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
1306            b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1307            long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
1308            b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1309            String refRegionLocation = "";
1310            String keyRegionLocation = "";
1311            if (b != null && b.length > 0) {
1312              try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1313                HRegionLocation hrl = rl.getRegionLocation(b);
1314                if (hrl != null) refRegionLocation = hrl.toString();
1315                // Key here probably has trailing zeros on it.
1316                hrl = rl.getRegionLocation(key.getBytes());
1317                if (hrl != null) keyRegionLocation = hrl.toString();
1318              }
1319            }
1320            LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) +
1321              ", refPrevEqualsKey=" +
1322                (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) +
1323                ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) +
1324                ", ref row date=" + ts + ", jobStr=" + jobStr +
1325                ", ref row count=" + count +
1326                ", ref row regionLocation=" + refRegionLocation +
1327                ", key row regionLocation=" + keyRegionLocation);
1328            refsSb.append(comma);
1329            comma = ",";
1330            refsSb.append(Bytes.toStringBinary(ref));
1331          }
1332        }
1333        return refsSb;
1334      }
1335    }
1336
1337    @Override
1338    public int run(String[] args) throws Exception {
1339      if (args.length != 2) {
1340        System.out.println("Usage : " + Verify.class.getSimpleName()
1341            + " <output dir> <num reducers>");
1342        return 0;
1343      }
1344
1345      String outputDir = args[0];
1346      int numReducers = Integer.parseInt(args[1]);
1347
1348      return run(outputDir, numReducers);
1349    }
1350
1351    public int run(String outputDir, int numReducers) throws Exception {
1352      return run(new Path(outputDir), numReducers);
1353    }
1354
1355    public int run(Path outputDir, int numReducers) throws Exception {
1356      LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
1357
1358      job = Job.getInstance(getConf());
1359
1360      job.setJobName("Link Verifier");
1361      job.setNumReduceTasks(numReducers);
1362      job.setJarByClass(getClass());
1363
1364      setJobScannerConf(job);
1365
1366      Scan scan = new Scan();
1367      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1368      scan.setCaching(10000);
1369      scan.setCacheBlocks(false);
1370      if (isMultiUnevenColumnFamilies(getConf())) {
1371        scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1372        scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1373      }
1374
1375      TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1376          VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1377      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
1378                                                     AbstractHBaseTool.class);
1379
1380      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1381
1382      job.setReducerClass(VerifyReducer.class);
1383      job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1384      job.setOutputKeyClass(BytesWritable.class);
1385      job.setOutputValueClass(BytesWritable.class);
1386      TextOutputFormat.setOutputPath(job, outputDir);
1387
1388      boolean success = job.waitForCompletion(true);
1389
1390      if (success) {
1391        Counters counters = job.getCounters();
1392        if (null == counters) {
1393          LOG.warn("Counters were null, cannot verify Job completion."
1394              + " This is commonly a result of insufficient YARN configuration.");
1395          // We don't have access to the counters to know if we have "bad" counts
1396          return 0;
1397        }
1398
1399        // If we find no unexpected values, the job didn't outright fail
1400        if (verifyUnexpectedValues(counters)) {
1401          // We didn't check referenced+unreferenced counts, leave that to visual inspection
1402          return 0;
1403        }
1404      }
1405
1406      // We failed
1407      return 1;
1408    }
1409
1410    public boolean verify(long expectedReferenced) throws Exception {
1411      if (job == null) {
1412        throw new IllegalStateException("You should call run() first");
1413      }
1414
1415      Counters counters = job.getCounters();
1416      if (counters == null) {
1417        LOG.info("Counters object was null, write verification cannot be performed."
1418              + " This is commonly a result of insufficient YARN configuration.");
1419        return false;
1420      }
1421
1422      // Run through each check, even if we fail one early
1423      boolean success = verifyExpectedValues(expectedReferenced, counters);
1424
1425      if (!verifyUnexpectedValues(counters)) {
1426        // We found counter objects which imply failure
1427        success = false;
1428      }
1429
1430      if (!success) {
1431        handleFailure(counters);
1432      }
1433      return success;
1434    }
1435
1436    /**
1437     * Verify the values in the Counters against the expected number of entries written.
1438     *
1439     * @param expectedReferenced
1440     *          Expected number of referenced entrires
1441     * @param counters
1442     *          The Job's Counters object
1443     * @return True if the values match what's expected, false otherwise
1444     */
1445    protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1446      final Counter referenced = counters.findCounter(Counts.REFERENCED);
1447      final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
1448      boolean success = true;
1449
1450      if (expectedReferenced != referenced.getValue()) {
1451        LOG.error("Expected referenced count does not match with actual referenced count. " +
1452            "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1453        success = false;
1454      }
1455
1456      if (unreferenced.getValue() > 0) {
1457        final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
1458        boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1459        LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1460            + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1461        success = false;
1462      }
1463
1464      return success;
1465    }
1466
1467    /**
1468     * Verify that the Counters don't contain values which indicate an outright failure from the Reducers.
1469     *
1470     * @param counters
1471     *          The Job's counters
1472     * @return True if the "bad" counter objects are 0, false otherwise
1473     */
1474    protected boolean verifyUnexpectedValues(Counters counters) {
1475      final Counter undefined = counters.findCounter(Counts.UNDEFINED);
1476      final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
1477      boolean success = true;
1478
1479      if (undefined.getValue() > 0) {
1480        LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1481        success = false;
1482      }
1483
1484      if (lostfamilies.getValue() > 0) {
1485        LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1486        success = false;
1487      }
1488
1489      return success;
1490    }
1491
1492    protected void handleFailure(Counters counters) throws IOException {
1493      Configuration conf = job.getConfiguration();
1494      TableName tableName = getTableName(conf);
1495      try (Connection conn = ConnectionFactory.createConnection(conf)) {
1496        try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1497          CounterGroup g = counters.getGroup("undef");
1498          Iterator<Counter> it = g.iterator();
1499          while (it.hasNext()) {
1500            String keyString = it.next().getName();
1501            byte[] key = Bytes.toBytes(keyString);
1502            HRegionLocation loc = rl.getRegionLocation(key, true);
1503            LOG.error("undefined row " + keyString + ", " + loc);
1504          }
1505          g = counters.getGroup("unref");
1506          it = g.iterator();
1507          while (it.hasNext()) {
1508            String keyString = it.next().getName();
1509            byte[] key = Bytes.toBytes(keyString);
1510            HRegionLocation loc = rl.getRegionLocation(key, true);
1511            LOG.error("unreferred row " + keyString + ", " + loc);
1512          }
1513        }
1514      }
1515    }
1516  }
1517
1518  /**
1519   * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
1520   * adds more data.
1521   */
1522  static class Loop extends Configured implements Tool {
1523
1524    private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
1525    private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " +
1526        "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" +
1527        " <num walker threads>] \n" +
1528        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
1529        "walkers will select and verify random flushed loop during Generation.";
1530
1531    IntegrationTestBigLinkedList it;
1532
1533    protected void runGenerator(int numMappers, long numNodes,
1534        String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers)
1535        throws Exception {
1536      Path outputPath = new Path(outputDir);
1537      UUID uuid = UUID.randomUUID(); //create a random UUID.
1538      Path generatorOutput = new Path(outputPath, uuid.toString());
1539
1540      Generator generator = new Generator();
1541      generator.setConf(getConf());
1542      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
1543          numWalkers);
1544      if (retCode > 0) {
1545        throw new RuntimeException("Generator failed with return code: " + retCode);
1546      }
1547      if (numWalkers > 0) {
1548        if (!generator.verify()) {
1549          throw new RuntimeException("Generator.verify failed");
1550        }
1551      }
1552    }
1553
1554    protected void runVerify(String outputDir,
1555        int numReducers, long expectedNumNodes) throws Exception {
1556      Path outputPath = new Path(outputDir);
1557      UUID uuid = UUID.randomUUID(); //create a random UUID.
1558      Path iterationOutput = new Path(outputPath, uuid.toString());
1559
1560      Verify verify = new Verify();
1561      verify.setConf(getConf());
1562      int retCode = verify.run(iterationOutput, numReducers);
1563      if (retCode > 0) {
1564        throw new RuntimeException("Verify.run failed with return code: " + retCode);
1565      }
1566
1567      if (!verify.verify(expectedNumNodes)) {
1568        throw new RuntimeException("Verify.verify failed");
1569      }
1570      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
1571    }
1572
1573    @Override
1574    public int run(String[] args) throws Exception {
1575      if (args.length < 5) {
1576        System.err.println(USAGE);
1577        return 1;
1578      }
1579      try {
1580        int numIterations = Integer.parseInt(args[0]);
1581        int numMappers = Integer.parseInt(args[1]);
1582        long numNodes = Long.parseLong(args[2]);
1583        String outputDir = args[3];
1584        int numReducers = Integer.parseInt(args[4]);
1585        Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1586        Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1587        Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]);
1588
1589        long expectedNumNodes = 0;
1590
1591        if (numIterations < 0) {
1592          numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
1593        }
1594        LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1595        for (int i = 0; i < numIterations; i++) {
1596          LOG.info("Starting iteration = " + i);
1597          runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers);
1598          expectedNumNodes += numMappers * numNodes;
1599          runVerify(outputDir, numReducers, expectedNumNodes);
1600        }
1601        return 0;
1602      } catch (NumberFormatException e) {
1603        System.err.println("Parsing loop arguments failed: " + e.getMessage());
1604        System.err.println(USAGE);
1605        return 1;
1606      }
1607    }
1608  }
1609
1610  /**
1611   * A stand alone program that prints out portions of a list created by {@link Generator}
1612   */
1613  private static class Print extends Configured implements Tool {
1614    @Override
1615    public int run(String[] args) throws Exception {
1616      Options options = new Options();
1617      options.addOption("s", "start", true, "start key");
1618      options.addOption("e", "end", true, "end key");
1619      options.addOption("l", "limit", true, "number to print");
1620
1621      GnuParser parser = new GnuParser();
1622      CommandLine cmd = null;
1623      try {
1624        cmd = parser.parse(options, args);
1625        if (cmd.getArgs().length != 0) {
1626          throw new ParseException("Command takes no arguments");
1627        }
1628      } catch (ParseException e) {
1629        System.err.println("Failed to parse command line " + e.getMessage());
1630        System.err.println();
1631        HelpFormatter formatter = new HelpFormatter();
1632        formatter.printHelp(getClass().getSimpleName(), options);
1633        System.exit(-1);
1634      }
1635
1636      Connection connection = ConnectionFactory.createConnection(getConf());
1637      Table table = connection.getTable(getTableName(getConf()));
1638
1639      Scan scan = new Scan();
1640      scan.setBatch(10000);
1641
1642      if (cmd.hasOption("s"))
1643        scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1644
1645      if (cmd.hasOption("e"))
1646        scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1647
1648      int limit = 0;
1649      if (cmd.hasOption("l"))
1650        limit = Integer.parseInt(cmd.getOptionValue("l"));
1651      else
1652        limit = 100;
1653
1654      ResultScanner scanner = table.getScanner(scan);
1655
1656      CINode node = new CINode();
1657      Result result = scanner.next();
1658      int count = 0;
1659      while (result != null && count++ < limit) {
1660        node = getCINode(result, node);
1661        System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1662            Bytes.toStringBinary(node.prev), node.count, node.client);
1663        result = scanner.next();
1664      }
1665      scanner.close();
1666      table.close();
1667      connection.close();
1668
1669      return 0;
1670    }
1671  }
1672
1673  /**
1674   * A stand alone program that deletes a single node.
1675   */
1676  private static class Delete extends Configured implements Tool {
1677    @Override
1678    public int run(String[] args) throws Exception {
1679      if (args.length != 1) {
1680        System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1681        return 0;
1682      }
1683      byte[] val = Bytes.toBytesBinary(args[0]);
1684
1685      org.apache.hadoop.hbase.client.Delete delete
1686        = new org.apache.hadoop.hbase.client.Delete(val);
1687
1688      try (Connection connection = ConnectionFactory.createConnection(getConf());
1689          Table table = connection.getTable(getTableName(getConf()))) {
1690        table.delete(delete);
1691      }
1692
1693      System.out.println("Delete successful");
1694      return 0;
1695    }
1696  }
1697
1698  abstract static class WalkerBase extends Configured{
1699    protected static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1700      Scan scan = new Scan();
1701      scan.setStartRow(startKey);
1702      scan.setBatch(1);
1703      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1704
1705      long t1 = System.currentTimeMillis();
1706      ResultScanner scanner = table.getScanner(scan);
1707      Result result = scanner.next();
1708      long t2 = System.currentTimeMillis();
1709      scanner.close();
1710
1711      if ( result != null) {
1712        CINode node = getCINode(result, new CINode());
1713        System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1714        return node;
1715      }
1716
1717      System.out.println("FSR " + (t2 - t1));
1718
1719      return null;
1720    }
1721    protected CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1722      Get get = new Get(row);
1723      get.addColumn(FAMILY_NAME, COLUMN_PREV);
1724      Result result = table.get(get);
1725      return getCINode(result, node);
1726    }
1727  }
1728  /**
1729   * A stand alone program that follows a linked list created by {@link Generator} and prints
1730   * timing info.
1731   */
1732  private static class Walker extends WalkerBase implements Tool {
1733
1734    public Walker(){}
1735
1736    @Override
1737    public int run(String[] args) throws IOException {
1738
1739      Options options = new Options();
1740      options.addOption("n", "num", true, "number of queries");
1741      options.addOption("s", "start", true, "key to start at, binary string");
1742      options.addOption("l", "logevery", true, "log every N queries");
1743
1744      GnuParser parser = new GnuParser();
1745      CommandLine cmd = null;
1746      try {
1747        cmd = parser.parse(options, args);
1748        if (cmd.getArgs().length != 0) {
1749          throw new ParseException("Command takes no arguments");
1750        }
1751      } catch (ParseException e) {
1752        System.err.println("Failed to parse command line " + e.getMessage());
1753        System.err.println();
1754        HelpFormatter formatter = new HelpFormatter();
1755        formatter.printHelp(getClass().getSimpleName(), options);
1756        System.exit(-1);
1757      }
1758
1759      long maxQueries = Long.MAX_VALUE;
1760      if (cmd.hasOption('n')) {
1761        maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1762      }
1763      Random rand = new SecureRandom();
1764      boolean isSpecificStart = cmd.hasOption('s');
1765
1766      byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1767      int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1768
1769      Connection connection = ConnectionFactory.createConnection(getConf());
1770      Table table = connection.getTable(getTableName(getConf()));
1771      long numQueries = 0;
1772      // If isSpecificStart is set, only walk one list from that particular node.
1773      // Note that in case of circular (or P-shaped) list it will walk forever, as is
1774      // the case in normal run without startKey.
1775      while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1776        if (!isSpecificStart) {
1777          startKey = new byte[ROWKEY_LENGTH];
1778          rand.nextBytes(startKey);
1779        }
1780        CINode node = findStartNode(table, startKey);
1781        if (node == null && isSpecificStart) {
1782          System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1783        }
1784        numQueries++;
1785        while (node != null && node.prev.length != NO_KEY.length &&
1786            numQueries < maxQueries) {
1787          byte[] prev = node.prev;
1788          long t1 = System.currentTimeMillis();
1789          node = getNode(prev, table, node);
1790          long t2 = System.currentTimeMillis();
1791          if (logEvery > 0 && numQueries % logEvery == 0) {
1792            System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1793          }
1794          numQueries++;
1795          if (node == null) {
1796            System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1797          } else if (node.prev.length == NO_KEY.length) {
1798            System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1799          }
1800        }
1801      }
1802      table.close();
1803      connection.close();
1804      return 0;
1805    }
1806  }
1807
1808  private static class Clean extends Configured implements Tool {
1809    @Override public int run(String[] args) throws Exception {
1810      if (args.length < 1) {
1811        System.err.println("Usage: Clean <output dir>");
1812        return -1;
1813      }
1814
1815      Path p = new Path(args[0]);
1816      Configuration conf = getConf();
1817      TableName tableName = getTableName(conf);
1818      try (FileSystem fs = HFileSystem.get(conf);
1819          Connection conn = ConnectionFactory.createConnection(conf);
1820          Admin admin = conn.getAdmin()) {
1821        if (admin.tableExists(tableName)) {
1822          admin.disableTable(tableName);
1823          admin.deleteTable(tableName);
1824        }
1825
1826        if (fs.exists(p)) {
1827          fs.delete(p, true);
1828        }
1829      }
1830
1831      return 0;
1832    }
1833  }
1834
1835  static TableName getTableName(Configuration conf) {
1836    return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1837  }
1838
1839  private static CINode getCINode(Result result, CINode node) {
1840    node.key = Bytes.copy(result.getRow());
1841    if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1842      node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1843    } else {
1844      node.prev = NO_KEY;
1845    }
1846    if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1847      node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1848    } else {
1849      node.count = -1;
1850    }
1851    if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1852      node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1853    } else {
1854      node.client = "";
1855    }
1856    return node;
1857  }
1858
1859  protected IntegrationTestingUtility util;
1860
1861  @Override
1862  public void setUpCluster() throws Exception {
1863    util = getTestingUtil(getConf());
1864    boolean isDistributed = util.isDistributedCluster();
1865    util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1866    if (!isDistributed) {
1867      util.startMiniMapReduceCluster();
1868    }
1869    this.setConf(util.getConfiguration());
1870  }
1871
1872  @Override
1873  public void cleanUpCluster() throws Exception {
1874    super.cleanUpCluster();
1875    if (util.isDistributedCluster()) {
1876      util.shutdownMiniMapReduceCluster();
1877    }
1878  }
1879
1880  private static boolean isMultiUnevenColumnFamilies(Configuration conf) {
1881    return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,true);
1882  }
1883
1884  @Test
1885  public void testContinuousIngest() throws IOException, Exception {
1886    //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1887    Configuration conf = getTestingUtil(getConf()).getConfiguration();
1888    if (isMultiUnevenColumnFamilies(getConf())) {
1889      // make sure per CF flush is on
1890      conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
1891    }
1892    int ret =
1893        ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1894            util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1895    org.junit.Assert.assertEquals(0, ret);
1896  }
1897
1898  private void usage() {
1899    System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1900    printCommands();
1901  }
1902
1903  private void printCommands() {
1904    System.err.println("Commands:");
1905    System.err.println(" generator  Map only job that generates data.");
1906    System.err.println(" verify     A map reduce job that looks for holes. Check return code and");
1907    System.err.println("            look at the counts after running. See REFERENCED and");
1908    System.err.println("            UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1909    System.err.println("            with the Generator.");
1910    System.err.println(" walker     " +
1911      "Standalone program that starts following a linked list & emits timing info.");
1912    System.err.println(" print      Standalone program that prints nodes in the linked list.");
1913    System.err.println(" delete     Standalone program that deletes a single node.");
1914    System.err.println(" loop       Program to Loop through Generator and Verify steps");
1915    System.err.println(" clean      Program to clean all left over detritus.");
1916    System.err.println(" search     Search for missing keys.");
1917    System.err.println("");
1918    System.err.println("General options:");
1919    System.err.println(" -D"+ TABLE_NAME_KEY+ "=<tableName>");
1920    System.err.println("    Run using the <tableName> as the tablename.  Defaults to "
1921        + DEFAULT_TABLE_NAME);
1922    System.err.println(" -D"+ HBaseTestingUtility.REGIONS_PER_SERVER_KEY+ "=<# regions>");
1923    System.err.println("    Create table with presplit regions per server.  Defaults to "
1924        + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
1925
1926    System.err.println(" -DuseMob=<true|false>");
1927    System.err.println("    Create table so that the mob read/write path is forced.  " +
1928        "Defaults to false");
1929
1930    System.err.flush();
1931  }
1932
1933  @Override
1934  protected void processOptions(CommandLine cmd) {
1935    super.processOptions(cmd);
1936    String[] args = cmd.getArgs();
1937    //get the class, run with the conf
1938    if (args.length < 1) {
1939      printUsage(this.getClass().getSimpleName() +
1940        " <general options> COMMAND [<COMMAND options>]", "General options:", "");
1941      printCommands();
1942      // Have to throw an exception here to stop the processing. Looks ugly but gets message across.
1943      throw new RuntimeException("Incorrect Number of args.");
1944    }
1945    toRun = args[0];
1946    otherArgs = Arrays.copyOfRange(args, 1, args.length);
1947  }
1948
1949  @Override
1950  public int runTestFromCommandLine() throws Exception {
1951    Tool tool = null;
1952    if (toRun.equalsIgnoreCase("Generator")) {
1953      tool = new Generator();
1954    } else if (toRun.equalsIgnoreCase("Verify")) {
1955      tool = new Verify();
1956    } else if (toRun.equalsIgnoreCase("Loop")) {
1957      Loop loop = new Loop();
1958      loop.it = this;
1959      tool = loop;
1960    } else if (toRun.equalsIgnoreCase("Walker")) {
1961      tool = new Walker();
1962    } else if (toRun.equalsIgnoreCase("Print")) {
1963      tool = new Print();
1964    } else if (toRun.equalsIgnoreCase("Delete")) {
1965      tool = new Delete();
1966    } else if (toRun.equalsIgnoreCase("Clean")) {
1967      tool = new Clean();
1968    } else if (toRun.equalsIgnoreCase("Search")) {
1969      tool = new Search();
1970    } else {
1971      usage();
1972      throw new RuntimeException("Unknown arg");
1973    }
1974
1975    return ToolRunner.run(getConf(), tool, otherArgs);
1976  }
1977
1978  @Override
1979  public TableName getTablename() {
1980    Configuration c = getConf();
1981    return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1982  }
1983
1984  @Override
1985  protected Set<String> getColumnFamilies() {
1986    if (isMultiUnevenColumnFamilies(getConf())) {
1987      return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
1988        Bytes.toString(TINY_FAMILY_NAME));
1989    } else {
1990      return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1991    }
1992  }
1993
1994  private static void setJobConf(Job job, int numMappers, long numNodes,
1995      Integer width, Integer wrapMultiplier, Integer numWalkers) {
1996    job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1997    job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1998    if (width != null) {
1999      job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
2000    }
2001    if (wrapMultiplier != null) {
2002      job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
2003    }
2004    if (numWalkers != null) {
2005      job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers);
2006    }
2007  }
2008
2009  public static void setJobScannerConf(Job job) {
2010    // Make sure scanners log something useful to make debugging possible.
2011    job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
2012    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
2013  }
2014
2015  public static void main(String[] args) throws Exception {
2016    Configuration conf = HBaseConfiguration.create();
2017    IntegrationTestingUtility.setUseDistributedCluster(conf);
2018    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
2019    System.exit(ret);
2020  }
2021}