001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Set;
030import java.util.SortedSet;
031import java.util.TreeSet;
032import java.util.UUID;
033import java.util.concurrent.ThreadLocalRandom;
034import java.util.concurrent.atomic.AtomicInteger;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.conf.Configured;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.LocatedFileStatus;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.fs.RemoteIterator;
041import org.apache.hadoop.hbase.Cell;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.HBaseTestingUtility;
044import org.apache.hadoop.hbase.HColumnDescriptor;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HRegionLocation;
047import org.apache.hadoop.hbase.HTableDescriptor;
048import org.apache.hadoop.hbase.IntegrationTestBase;
049import org.apache.hadoop.hbase.IntegrationTestingUtility;
050import org.apache.hadoop.hbase.MasterNotRunningException;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.Admin;
053import org.apache.hadoop.hbase.client.BufferedMutator;
054import org.apache.hadoop.hbase.client.BufferedMutatorParams;
055import org.apache.hadoop.hbase.client.Connection;
056import org.apache.hadoop.hbase.client.ConnectionConfiguration;
057import org.apache.hadoop.hbase.client.ConnectionFactory;
058import org.apache.hadoop.hbase.client.Get;
059import org.apache.hadoop.hbase.client.Mutation;
060import org.apache.hadoop.hbase.client.Put;
061import org.apache.hadoop.hbase.client.RegionLocator;
062import org.apache.hadoop.hbase.client.Result;
063import org.apache.hadoop.hbase.client.ResultScanner;
064import org.apache.hadoop.hbase.client.Scan;
065import org.apache.hadoop.hbase.client.ScannerCallable;
066import org.apache.hadoop.hbase.client.Table;
067import org.apache.hadoop.hbase.fs.HFileSystem;
068import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
069import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
070import org.apache.hadoop.hbase.mapreduce.TableMapper;
071import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
072import org.apache.hadoop.hbase.mapreduce.WALPlayer;
073import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy;
074import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
075import org.apache.hadoop.hbase.testclassification.IntegrationTests;
076import org.apache.hadoop.hbase.util.AbstractHBaseTool;
077import org.apache.hadoop.hbase.util.Bytes;
078import org.apache.hadoop.hbase.util.CommonFSUtils;
079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
080import org.apache.hadoop.hbase.util.Random64;
081import org.apache.hadoop.hbase.util.RegionSplitter;
082import org.apache.hadoop.hbase.wal.WALEdit;
083import org.apache.hadoop.hbase.wal.WALKey;
084import org.apache.hadoop.io.BytesWritable;
085import org.apache.hadoop.io.NullWritable;
086import org.apache.hadoop.io.Writable;
087import org.apache.hadoop.mapreduce.Counter;
088import org.apache.hadoop.mapreduce.CounterGroup;
089import org.apache.hadoop.mapreduce.Counters;
090import org.apache.hadoop.mapreduce.InputFormat;
091import org.apache.hadoop.mapreduce.InputSplit;
092import org.apache.hadoop.mapreduce.Job;
093import org.apache.hadoop.mapreduce.JobContext;
094import org.apache.hadoop.mapreduce.Mapper;
095import org.apache.hadoop.mapreduce.RecordReader;
096import org.apache.hadoop.mapreduce.Reducer;
097import org.apache.hadoop.mapreduce.TaskAttemptContext;
098import org.apache.hadoop.mapreduce.TaskAttemptID;
099import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
100import org.apache.hadoop.mapreduce.lib.input.FileSplit;
101import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
102import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
103import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
104import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
105import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
106import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
107import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
108import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
109import org.apache.hadoop.util.Tool;
110import org.apache.hadoop.util.ToolRunner;
111import org.junit.Test;
112import org.junit.experimental.categories.Category;
113import org.slf4j.Logger;
114import org.slf4j.LoggerFactory;
115
116import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
117import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
118import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
119import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
120import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
121import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
122import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
123import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
124
125/**
126 * <p>
127 * This is an integration test borrowed from goraci, written by Keith Turner, which is in turn
128 * inspired by the Accumulo test called continous ingest (ci). The original source code can be found
129 * here:
130 * <ul>
131 * <li>https://github.com/keith-turner/goraci</li>
132 * <li>https://github.com/enis/goraci/</li>
133 * </ul>
134 * </p>
135 * <p>
136 * Apache Accumulo [0] has a simple test suite that verifies that data is not lost at scale. This
137 * test suite is called continuous ingest. This test runs many ingest clients that continually
138 * create linked lists containing 25 million nodes. At some point the clients are stopped and a map
139 * reduce job is run to ensure no linked list has a hole. A hole indicates data was lost.
140 * </p>
141 * <p>
142 * The nodes in the linked list are random. This causes each linked list to spread across the table.
143 * Therefore if one part of a table loses data, then it will be detected by references in another
144 * part of the table.
145 * </p>
146 * <p>
147 * <h3>THE ANATOMY OF THE TEST</h3> Below is rough sketch of how data is written. For specific
148 * details look at the Generator code.
149 * </p>
150 * <p>
151 * <ol>
152 * <li>Write out 1 million nodes (1M is the configurable 'width' mentioned below)</li>
153 * <li>Flush the client</li>
154 * <li>Write out 1 million that reference previous million</li>
155 * <li>If this is the 25th set of 1 million nodes, then update 1st set of million to point to last
156 * (25 is configurable; its the 'wrap multiplier' referred to below)</li>
157 * <li>goto 1</li>
158 * </ol>
159 * </p>
160 * <p>
161 * The key is that nodes only reference flushed nodes. Therefore a node should never reference a
162 * missing node, even if the ingest client is killed at any point in time.
163 * </p>
164 * <p>
165 * When running this test suite w/ Accumulo there is a script running in parallel called the
166 * Aggitator that randomly and continuously kills server processes. The outcome was that many data
167 * loss bugs were found in Accumulo by doing this. This test suite can also help find bugs that
168 * impact uptime and stability when run for days or weeks.
169 * </p>
170 * <p>
171 * This test suite consists the following
172 * <ul>
173 * <li>a few Java programs</li>
174 * <li>a little helper script to run the java programs</li>
175 * <li>a maven script to build it</li>
176 * </ul>
177 * </p>
178 * <p>
179 * When generating data, its best to have each map task generate a multiple of 25 million. The
180 * reason for this is that circular linked list are generated every 25M. Not generating a multiple
181 * in 25M will result in some nodes in the linked list not having references. The loss of an
182 * unreferenced node can not be detected.
183 * </p>
184 * <p>
185 * <h3>Below is a description of the Java programs</h3>
186 * <ul>
187 * <li>{@code Generator} - A map only job that generates data. As stated previously, its best to
188 * generate data in multiples of 25M. An option is also available to allow concurrent walkers to
189 * select and walk random flushed loops during this phase.</li>
190 * <li>{@code Verify} - A map reduce job that looks for holes. Look at the counts after running.
191 * {@code REFERENCED} and {@code UNREFERENCED} are ok, any {@code UNDEFINED} counts are bad. Do not
192 * run at the same time as the Generator.</li>
193 * <li>{@code Walker} - A standalone program that start following a linked list and emits timing
194 * info.</li>
195 * <li>{@code Print} - A standalone program that prints nodes in the linked list</li>
196 * <li>{@code Delete} - A standalone program that deletes a single node</li>
197 * </ul>
198 * This class can be run as a unit test, as an integration test, or from the command line
199 * </p>
200 * <p>
201 * ex:
202 *
203 * <pre>
204 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
205 *    loop 2 1 100000 /temp 1 1000 50 1 0
206 * </pre>
207 * </p>
208 */
209@Category(IntegrationTests.class)
210public class IntegrationTestBigLinkedList extends IntegrationTestBase {
211  protected static final byte[] NO_KEY = new byte[1];
212  protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
213  protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
214  protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
215  private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
216  private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
217
218  // link to the id of the prev node in the linked list
219  protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
220
221  // identifier of the mapred task that generated this row
222  protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
223
224  // the id of the row within the same client.
225  protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
226
227  /** How many rows to write per map task. This has to be a multiple of 25M */
228  private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY =
229    "IntegrationTestBigLinkedList.generator.num_rows";
230
231  private static final String GENERATOR_NUM_MAPPERS_KEY =
232    "IntegrationTestBigLinkedList.generator.map.tasks";
233
234  private static final String GENERATOR_WIDTH_KEY = "IntegrationTestBigLinkedList.generator.width";
235
236  private static final String GENERATOR_WRAP_KEY = "IntegrationTestBigLinkedList.generator.wrap";
237
238  private static final String CONCURRENT_WALKER_KEY =
239    "IntegrationTestBigLinkedList.generator.concurrentwalkers";
240
241  protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
242
243  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
244
245  private static final int WIDTH_DEFAULT = 1000000;
246
247  /**
248   * The 'wrap multipler' default.
249   */
250  private static final int WRAP_DEFAULT = 25;
251  private static final int ROWKEY_LENGTH = 16;
252
253  private static final int CONCURRENT_WALKER_DEFAULT = 0;
254
255  protected String toRun;
256  protected String[] otherArgs;
257
258  static class CINode {
259    byte[] key;
260    byte[] prev;
261    String client;
262    long count;
263  }
264
265  /**
266   * A Map only job that generates random linked list and stores them.
267   */
268  static class Generator extends Configured implements Tool {
269    private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
270
271    /**
272     * Set this configuration if you want to test single-column family flush works. If set, we will
273     * add a big column family and a small column family on either side of the usual ITBLL 'meta'
274     * column family. When we write out the ITBLL, we will also add to the big column family a value
275     * bigger than that for ITBLL and for small, something way smaller. The idea is that when
276     * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
277     * way. Here is how you would pass it:
278     * <p>
279     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
280     * -Dgenerator.multiple.columnfamilies=true generator 1 10 g
281     */
282    public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
283      "generator.multiple.columnfamilies";
284
285    /**
286     * Set this configuration if you want to scale up the size of test data quickly.
287     * <p>
288     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
289     * -Dgenerator.big.family.value.size=1024 generator 1 10 output
290     */
291    public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size";
292
293    public static enum GeneratorCounts {
294      SUCCESS,
295      TERMINATING,
296      UNDEFINED,
297      IOEXCEPTION
298    }
299
300    public static final String USAGE = "Usage : " + Generator.class.getSimpleName()
301      + " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>"
302      + " <num walker threads>] \n"
303      + "Where <num nodes per map> should be a multiple of 'width' * 'wrap multiplier'.\n"
304      + "25M is default because default 'width' is 1M and default 'wrap multiplier' is 25.\n"
305      + "We write out 1M nodes and then flush the client. After 25 flushes, we connect \n"
306      + "first written nodes back to the 25th set.\n"
307      + "Walkers verify random flushed loops during Generation.";
308
309    public Job job;
310
311    static class GeneratorInputFormat extends InputFormat<BytesWritable, NullWritable> {
312      static class GeneratorInputSplit extends InputSplit implements Writable {
313        @Override
314        public long getLength() throws IOException, InterruptedException {
315          return 1;
316        }
317
318        @Override
319        public String[] getLocations() throws IOException, InterruptedException {
320          return new String[0];
321        }
322
323        @Override
324        public void readFields(DataInput arg0) throws IOException {
325        }
326
327        @Override
328        public void write(DataOutput arg0) throws IOException {
329        }
330      }
331
332      static class GeneratorRecordReader extends RecordReader<BytesWritable, NullWritable> {
333        private long count;
334        private long numNodes;
335        // Use Random64 to avoid issue described in HBASE-21256.
336        private Random64 rand = new Random64();
337
338        @Override
339        public void close() throws IOException {
340        }
341
342        @Override
343        public BytesWritable getCurrentKey() throws IOException, InterruptedException {
344          byte[] bytes = new byte[ROWKEY_LENGTH];
345          rand.nextBytes(bytes);
346          return new BytesWritable(bytes);
347        }
348
349        @Override
350        public NullWritable getCurrentValue() throws IOException, InterruptedException {
351          return NullWritable.get();
352        }
353
354        @Override
355        public float getProgress() throws IOException, InterruptedException {
356          return (float) (count / (double) numNodes);
357        }
358
359        @Override
360        public void initialize(InputSplit arg0, TaskAttemptContext context)
361          throws IOException, InterruptedException {
362          numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
363        }
364
365        @Override
366        public boolean nextKeyValue() throws IOException, InterruptedException {
367          return count++ < numNodes;
368        }
369      }
370
371      @Override
372      public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
373        TaskAttemptContext context) throws IOException, InterruptedException {
374        GeneratorRecordReader rr = new GeneratorRecordReader();
375        rr.initialize(split, context);
376        return rr;
377      }
378
379      @Override
380      public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
381        int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
382
383        ArrayList<InputSplit> splits = new ArrayList<>(numMappers);
384
385        for (int i = 0; i < numMappers; i++) {
386          splits.add(new GeneratorInputSplit());
387        }
388
389        return splits;
390      }
391    }
392
393    /** Ensure output files from prev-job go to map inputs for current job */
394    static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
395      @Override
396      protected boolean isSplitable(JobContext context, Path filename) {
397        return false;
398      }
399    }
400
401    /**
402     * Some ASCII art time:
403     * <p>
404     * [ . . . ] represents one batch of random longs of length WIDTH
405     *
406     * <pre>
407     *                _________________________
408     *               |                  ______ |
409     *               |                 |      ||
410     *             .-+-----------------+-----.||
411     *             | |                 |     |||
412     * first   = [ . . . . . . . . . . . ]   |||
413     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
414     *             | | | | | | | | | | |     |||
415     * prev    = [ . . . . . . . . . . . ]   |||
416     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
417     *             | | | | | | | | | | |     |||
418     * current = [ . . . . . . . . . . . ]   |||
419     *                                       |||
420     * ...                                   |||
421     *                                       |||
422     * last    = [ . . . . . . . . . . . ]   |||
423     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
424     *             |                 |________||
425     *             |___________________________|
426     * </pre>
427     */
428
429    static class GeneratorMapper
430      extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
431
432      byte[][] first = null;
433      byte[][] prev = null;
434      byte[][] current = null;
435      byte[] id;
436      long count = 0;
437      int i;
438      BufferedMutator mutator;
439      Connection connection;
440      long numNodes;
441      long wrap;
442      int width;
443      boolean multipleUnevenColumnFamilies;
444      byte[] tinyValue = new byte[] { 't' };
445      byte[] bigValue = null;
446      Configuration conf;
447      // Use Random64 to avoid issue described in HBASE-21256.
448      private Random64 rand = new Random64();
449
450      volatile boolean walkersStop;
451      int numWalkers;
452      final Object flushedLoopsLock = new Object();
453      volatile List<Long> flushedLoops = new ArrayList<>();
454      List<Thread> walkers = new ArrayList<>();
455
456      @Override
457      protected void setup(Context context) throws IOException, InterruptedException {
458        id = Bytes.toBytes("Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID());
459        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
460        instantiateHTable();
461        this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
462        current = new byte[this.width][];
463        int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
464        this.wrap = (long) wrapMultiplier * width;
465        this.numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY,
466          (long) WIDTH_DEFAULT * WRAP_DEFAULT);
467        if (this.numNodes < this.wrap) {
468          this.wrap = this.numNodes;
469        }
470        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
471        this.numWalkers =
472          context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
473        this.walkersStop = false;
474        this.conf = context.getConfiguration();
475
476        if (multipleUnevenColumnFamilies) {
477          int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256);
478          int limit =
479            context.getConfiguration().getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
480              ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT);
481
482          Preconditions.checkArgument(n <= limit, "%s(%s) > %s(%s)", BIG_FAMILY_VALUE_SIZE_KEY, n,
483            ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit);
484
485          bigValue = new byte[n];
486          rand.nextBytes(bigValue);
487          LOG.info("Create a bigValue with " + n + " bytes.");
488        }
489
490        Preconditions.checkArgument(numNodes > 0, "numNodes(%s) <= 0", numNodes);
491        Preconditions.checkArgument(numNodes % width == 0, "numNodes(%s) mod width(%s) != 0",
492          numNodes, width);
493        Preconditions.checkArgument(numNodes % wrap == 0, "numNodes(%s) mod wrap(%s) != 0",
494          numNodes, wrap);
495      }
496
497      protected void instantiateHTable() throws IOException {
498        mutator = connection
499          .getBufferedMutator(new BufferedMutatorParams(getTableName(connection.getConfiguration()))
500            .writeBufferSize(4 * 1024 * 1024));
501      }
502
503      @Override
504      protected void cleanup(Context context) throws IOException, InterruptedException {
505        joinWalkers();
506        mutator.close();
507        connection.close();
508      }
509
510      @Override
511      protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
512        current[i] = new byte[key.getLength()];
513        System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
514        if (++i == current.length) {
515          LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=", current.length,
516            count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i);
517          persist(output, count, prev, current, id);
518          i = 0;
519
520          if (first == null) {
521            first = current;
522          }
523          prev = current;
524          current = new byte[this.width][];
525
526          count += current.length;
527          output.setStatus("Count " + count);
528
529          if (count % wrap == 0) {
530            // this block of code turns the 1 million linked list of length 25 into one giant
531            // circular linked list of 25 million
532            circularLeftShift(first);
533            persist(output, -1, prev, first, null);
534            // At this point the entire loop has been flushed so we can add one of its nodes to the
535            // concurrent walker
536            if (numWalkers > 0) {
537              addFlushed(key.getBytes());
538              if (walkers.isEmpty()) {
539                startWalkers(numWalkers, conf, output);
540              }
541            }
542            first = null;
543            prev = null;
544          }
545        }
546      }
547
548      private static <T> void circularLeftShift(T[] first) {
549        T ez = first[0];
550        System.arraycopy(first, 1, first, 0, first.length - 1);
551        first[first.length - 1] = ez;
552      }
553
554      private void addFlushed(byte[] rowKey) {
555        synchronized (flushedLoopsLock) {
556          flushedLoops.add(Bytes.toLong(rowKey));
557          flushedLoopsLock.notifyAll();
558        }
559      }
560
561      protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
562        throws IOException {
563        for (int i = 0; i < current.length; i++) {
564
565          if (i % 100 == 0) {
566            // Tickle progress every so often else maprunner will think us hung
567            output.progress();
568          }
569
570          Put put = new Put(current[i]);
571          put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
572
573          if (count >= 0) {
574            put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
575          }
576          if (id != null) {
577            put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
578          }
579          // See if we are to write multiple columns.
580          if (this.multipleUnevenColumnFamilies) {
581            // Use any column name.
582            put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
583            // Use any column name.
584            put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
585          }
586          mutator.mutate(put);
587        }
588
589        mutator.flush();
590      }
591
592      private void startWalkers(int numWalkers, Configuration conf, Context context) {
593        LOG.info("Starting " + numWalkers + " concurrent walkers");
594        for (int i = 0; i < numWalkers; i++) {
595          Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context));
596          walker.start();
597          walkers.add(walker);
598        }
599      }
600
601      private void joinWalkers() {
602        synchronized (flushedLoopsLock) {
603          walkersStop = true;
604          flushedLoopsLock.notifyAll();
605        }
606        for (Thread walker : walkers) {
607          Uninterruptibles.joinUninterruptibly(walker);
608        }
609      }
610
611      /**
612       * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by
613       * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are
614       * configured to only log erroneous nodes.
615       */
616
617      public class ContinuousConcurrentWalker implements Runnable {
618
619        ConcurrentWalker walker;
620        Configuration conf;
621        Context context;
622
623        public ContinuousConcurrentWalker(Configuration conf, Context context) {
624          this.conf = conf;
625          this.context = context;
626        }
627
628        @Override
629        public void run() {
630          while (!walkersStop) {
631            try {
632              long node = selectLoop();
633              try {
634                walkLoop(node);
635              } catch (IOException e) {
636                context.getCounter(GeneratorCounts.IOEXCEPTION).increment(1l);
637                return;
638              }
639            } catch (InterruptedException e) {
640              return;
641            }
642          }
643        }
644
645        private void walkLoop(long node) throws IOException {
646          walker = new ConcurrentWalker(context);
647          walker.setConf(conf);
648          walker.run(node, wrap);
649        }
650
651        private long selectLoop() throws InterruptedException {
652          synchronized (flushedLoopsLock) {
653            while (flushedLoops.isEmpty() && !walkersStop) {
654              flushedLoopsLock.wait();
655            }
656            if (walkersStop) {
657              throw new InterruptedException();
658            }
659            return flushedLoops.get(ThreadLocalRandom.current().nextInt(flushedLoops.size()));
660          }
661        }
662      }
663
664      public static class ConcurrentWalker extends WalkerBase {
665
666        Context context;
667
668        public ConcurrentWalker(Context context) {
669          this.context = context;
670        }
671
672        public void run(long startKeyIn, long maxQueriesIn) throws IOException {
673
674          long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
675          byte[] startKey = Bytes.toBytes(startKeyIn);
676
677          Connection connection = ConnectionFactory.createConnection(getConf());
678          Table table = connection.getTable(getTableName(getConf()));
679          long numQueries = 0;
680          // If isSpecificStart is set, only walk one list from that particular node.
681          // Note that in case of circular (or P-shaped) list it will walk forever, as is
682          // the case in normal run without startKey.
683
684          CINode node = findStartNode(table, startKey);
685          if (node == null) {
686            LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
687            throw new IOException("Start node not found: " + startKeyIn);
688          }
689          while (numQueries < maxQueries) {
690            numQueries++;
691            byte[] prev = node.prev;
692            node = getNode(prev, table, node);
693            if (node == null) {
694              LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
695              context.getCounter(GeneratorCounts.UNDEFINED).increment(1l);
696            } else if (node.prev.length == NO_KEY.length) {
697              LOG.error(
698                "ConcurrentWalker found TERMINATING NODE: " + Bytes.toStringBinary(node.key));
699              context.getCounter(GeneratorCounts.TERMINATING).increment(1l);
700            } else {
701              // Increment for successful walk
702              context.getCounter(GeneratorCounts.SUCCESS).increment(1l);
703            }
704          }
705          table.close();
706          connection.close();
707        }
708      }
709    }
710
711    @Override
712    public int run(String[] args) throws Exception {
713      if (args.length < 3) {
714        System.err.println(USAGE);
715        return 1;
716      }
717      try {
718        int numMappers = Integer.parseInt(args[0]);
719        long numNodes = Long.parseLong(args[1]);
720        Path tmpOutput = new Path(args[2]);
721        Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
722        Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
723        Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]);
724        return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
725      } catch (NumberFormatException e) {
726        System.err.println("Parsing generator arguments failed: " + e.getMessage());
727        System.err.println(USAGE);
728        return 1;
729      }
730    }
731
732    protected void createSchema() throws IOException {
733      Configuration conf = getConf();
734      TableName tableName = getTableName(conf);
735      try (Connection conn = ConnectionFactory.createConnection(conf);
736        Admin admin = conn.getAdmin()) {
737        if (!admin.tableExists(tableName)) {
738          HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
739          htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
740          // Always add these families. Just skip writing to them when we do not test per CF flush.
741          htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME));
742          htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME));
743          // if -DuseMob=true force all data through mob path.
744          if (conf.getBoolean("useMob", false)) {
745            for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
746              hcd.setMobEnabled(true);
747              hcd.setMobThreshold(4);
748            }
749          }
750
751          // If we want to pre-split compute how many splits.
752          if (
753            conf.getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY,
754              HBaseTestingUtility.PRESPLIT_TEST_TABLE)
755          ) {
756            int numberOfServers = admin.getRegionServers().size();
757            if (numberOfServers == 0) {
758              throw new IllegalStateException("No live regionservers");
759            }
760            int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
761              HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
762            int totalNumberOfRegions = numberOfServers * regionsPerServer;
763            LOG.info("Number of live regionservers: " + numberOfServers + ", "
764              + "pre-splitting table into " + totalNumberOfRegions + " regions "
765              + "(default regions per server: " + regionsPerServer + ")");
766
767            byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
768
769            admin.createTable(htd, splits);
770          } else {
771            // Looks like we're just letting things play out.
772            // Create a table with on region by default.
773            // This will make the splitting work hard.
774            admin.createTable(htd);
775          }
776        }
777      } catch (MasterNotRunningException e) {
778        LOG.error("Master not running", e);
779        throw new IOException(e);
780      }
781    }
782
783    public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width,
784      Integer wrapMultiplier, Integer numWalkers) throws Exception {
785      LOG.info(
786        "Running RandomInputGenerator with numMappers=" + numMappers + ", numNodes=" + numNodes);
787      Job job = Job.getInstance(getConf());
788
789      job.setJobName("Random Input Generator");
790      job.setNumReduceTasks(0);
791      job.setJarByClass(getClass());
792
793      job.setInputFormatClass(GeneratorInputFormat.class);
794      job.setOutputKeyClass(BytesWritable.class);
795      job.setOutputValueClass(NullWritable.class);
796
797      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
798
799      job.setMapperClass(Mapper.class); // identity mapper
800
801      FileOutputFormat.setOutputPath(job, tmpOutput);
802      job.setOutputFormatClass(SequenceFileOutputFormat.class);
803      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);
804
805      boolean success = jobCompletion(job);
806
807      return success ? 0 : 1;
808    }
809
810    public int runGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width,
811      Integer wrapMultiplier, Integer numWalkers) throws Exception {
812      LOG.info("Running Generator with numMappers=" + numMappers + ", numNodes=" + numNodes);
813      createSchema();
814      job = Job.getInstance(getConf());
815
816      job.setJobName("Link Generator");
817      job.setNumReduceTasks(0);
818      job.setJarByClass(getClass());
819
820      FileInputFormat.setInputPaths(job, tmpOutput);
821      job.setInputFormatClass(OneFilePerMapperSFIF.class);
822      job.setOutputKeyClass(NullWritable.class);
823      job.setOutputValueClass(NullWritable.class);
824
825      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
826
827      setMapperForGenerator(job);
828
829      job.setOutputFormatClass(NullOutputFormat.class);
830
831      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
832      TableMapReduceUtil.addDependencyJars(job);
833      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
834        AbstractHBaseTool.class);
835      TableMapReduceUtil.initCredentials(job);
836
837      boolean success = jobCompletion(job);
838
839      return success ? 0 : 1;
840    }
841
842    protected boolean jobCompletion(Job job)
843      throws IOException, InterruptedException, ClassNotFoundException {
844      boolean success = job.waitForCompletion(true);
845      return success;
846    }
847
848    protected void setMapperForGenerator(Job job) {
849      job.setMapperClass(GeneratorMapper.class);
850    }
851
852    public int run(int numMappers, long numNodes, Path tmpOutput, Integer width,
853      Integer wrapMultiplier, Integer numWalkers) throws Exception {
854      int ret =
855        runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
856      if (ret > 0) {
857        return ret;
858      }
859      return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
860    }
861
862    public boolean verify() {
863      try {
864        Counters counters = job.getCounters();
865        if (counters == null) {
866          LOG.info("Counters object was null, Generator verification cannot be performed."
867            + " This is commonly a result of insufficient YARN configuration.");
868          return false;
869        }
870
871        if (
872          counters.findCounter(GeneratorCounts.TERMINATING).getValue() > 0
873            || counters.findCounter(GeneratorCounts.UNDEFINED).getValue() > 0
874            || counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue() > 0
875        ) {
876          LOG.error("Concurrent walker failed to verify during Generation phase");
877          LOG.error(
878            "TERMINATING nodes: " + counters.findCounter(GeneratorCounts.TERMINATING).getValue());
879          LOG.error(
880            "UNDEFINED nodes: " + counters.findCounter(GeneratorCounts.UNDEFINED).getValue());
881          LOG.error(
882            "IOEXCEPTION nodes: " + counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue());
883          return false;
884        }
885      } catch (IOException e) {
886        LOG.info("Generator verification could not find counter");
887        return false;
888      }
889      return true;
890    }
891  }
892
893  /**
894   * Tool to search missing rows in WALs and hfiles. Pass in file or dir of keys to search for. Key
895   * file must have been written by Verify step (we depend on the format it writes out. We'll read
896   * them in and then search in hbase WALs and oldWALs dirs (Some of this is TODO).
897   */
898  static class Search extends Configured implements Tool {
899    private static final Logger LOG = LoggerFactory.getLogger(Search.class);
900    protected Job job;
901
902    private static void printUsage(final String error) {
903      if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
904      System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
905    }
906
907    @Override
908    public int run(String[] args) throws Exception {
909      if (args.length < 1 || args.length > 2) {
910        printUsage(null);
911        return 1;
912      }
913      Path inputDir = new Path(args[0]);
914      int numMappers = 1;
915      if (args.length > 1) {
916        numMappers = Integer.parseInt(args[1]);
917      }
918      return run(inputDir, numMappers);
919    }
920
921    /**
922     * WALPlayer override that searches for keys loaded in the setup.
923     */
924    public static class WALSearcher extends WALPlayer {
925      public WALSearcher(Configuration conf) {
926        super(conf);
927      }
928
929      /**
930       * The actual searcher mapper.
931       */
932      public static class WALMapperSearcher extends WALMapper {
933        private SortedSet<byte[]> keysToFind;
934        private AtomicInteger rows = new AtomicInteger(0);
935
936        @Override
937        public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
938          throws IOException {
939          super.setup(context);
940          try {
941            this.keysToFind = readKeysToSearch(context.getConfiguration());
942            LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
943          } catch (InterruptedException e) {
944            throw new InterruptedIOException(e.toString());
945          }
946        }
947
948        @Override
949        protected boolean filter(Context context, Cell cell) {
950          // TODO: Can I do a better compare than this copying out key?
951          byte[] row = new byte[cell.getRowLength()];
952          System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
953          boolean b = this.keysToFind.contains(row);
954          if (b) {
955            String keyStr = Bytes.toStringBinary(row);
956            try {
957              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
958            } catch (IOException | InterruptedException e) {
959              LOG.warn(e.toString(), e);
960            }
961            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
962              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
963            }
964            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
965          }
966          return b;
967        }
968      }
969
970      // Put in place the above WALMapperSearcher.
971      @Override
972      public Job createSubmittableJob(String[] args) throws IOException {
973        Job job = super.createSubmittableJob(args);
974        // Call my class instead.
975        job.setJarByClass(WALMapperSearcher.class);
976        job.setMapperClass(WALMapperSearcher.class);
977        job.setOutputFormatClass(NullOutputFormat.class);
978        return job;
979      }
980    }
981
982    static final String FOUND_GROUP_KEY = "Found";
983    static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
984
985    public int run(Path inputDir, int numMappers) throws Exception {
986      getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
987      SortedSet<byte[]> keys = readKeysToSearch(getConf());
988      if (keys.isEmpty()) throw new RuntimeException("No keys to find");
989      LOG.info("Count of keys to find: " + keys.size());
990      for (byte[] key : keys)
991        LOG.info("Key: " + Bytes.toStringBinary(key));
992      // Now read all WALs. In two dirs. Presumes certain layout.
993      Path walsDir =
994        new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
995      Path oldWalsDir =
996        new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
997      LOG.info("Running Search with keys inputDir=" + inputDir + ", numMappers=" + numMappers
998        + " against " + getConf().get(HConstants.HBASE_DIR));
999      int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
1000        new String[] { walsDir.toString(), "" });
1001      if (ret != 0) {
1002        return ret;
1003      }
1004      return ToolRunner.run(getConf(), new WALSearcher(getConf()),
1005        new String[] { oldWalsDir.toString(), "" });
1006    }
1007
1008    static SortedSet<byte[]> readKeysToSearch(final Configuration conf)
1009      throws IOException, InterruptedException {
1010      Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
1011      FileSystem fs = FileSystem.get(conf);
1012      SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1013      if (!fs.exists(keysInputDir)) {
1014        throw new FileNotFoundException(keysInputDir.toString());
1015      }
1016      if (!fs.isDirectory(keysInputDir)) {
1017        throw new UnsupportedOperationException("TODO");
1018      } else {
1019        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
1020        while (iterator.hasNext()) {
1021          LocatedFileStatus keyFileStatus = iterator.next();
1022          // Skip "_SUCCESS" file.
1023          if (keyFileStatus.getPath().getName().startsWith("_")) continue;
1024          result.addAll(readFileToSearch(conf, keyFileStatus));
1025        }
1026      }
1027      return result;
1028    }
1029
1030    private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
1031      final LocatedFileStatus keyFileStatus) throws IOException, InterruptedException {
1032      SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1033      // Return entries that are flagged VerifyCounts.UNDEFINED in the value. Return the row. This
1034      // is
1035      // what is missing.
1036      TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
1037      try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
1038        new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
1039        InputSplit is =
1040          new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String[] {});
1041        rr.initialize(is, context);
1042        while (rr.nextKeyValue()) {
1043          rr.getCurrentKey();
1044          BytesWritable bw = rr.getCurrentValue();
1045          if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.VerifyCounts.UNDEFINED) {
1046            byte[] key = new byte[rr.getCurrentKey().getLength()];
1047            System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0,
1048              rr.getCurrentKey().getLength());
1049            result.add(key);
1050          }
1051        }
1052      }
1053      return result;
1054    }
1055  }
1056
1057  /**
1058   * A Map Reduce job that verifies that the linked lists generated by {@link Generator} do not have
1059   * any holes.
1060   */
1061  static class Verify extends Configured implements Tool {
1062    private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
1063    protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
1064    protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
1065    protected Job job;
1066
1067    public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
1068      private BytesWritable row = new BytesWritable();
1069      private BytesWritable ref = new BytesWritable();
1070      private boolean multipleUnevenColumnFamilies;
1071
1072      @Override
1073      protected void
1074        setup(Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
1075          throws IOException, InterruptedException {
1076        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
1077      }
1078
1079      @Override
1080      protected void map(ImmutableBytesWritable key, Result value, Context context)
1081        throws IOException, InterruptedException {
1082        byte[] rowKey = key.get();
1083        row.set(rowKey, 0, rowKey.length);
1084        if (
1085          multipleUnevenColumnFamilies && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME)
1086            || !value.containsColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME))
1087        ) {
1088          context.write(row, DEF_LOST_FAMILIES);
1089        } else {
1090          context.write(row, DEF);
1091        }
1092        byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
1093        if (prev != null && prev.length > 0) {
1094          ref.set(prev, 0, prev.length);
1095          context.write(ref, row);
1096        } else {
1097          LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
1098        }
1099      }
1100    }
1101
1102    /**
1103     * Don't change the order of these enums. Their ordinals are used as type flag when we emit
1104     * problems found from the reducer.
1105     */
1106    public static enum VerifyCounts {
1107      UNREFERENCED,
1108      UNDEFINED,
1109      REFERENCED,
1110      CORRUPT,
1111      EXTRAREFERENCES,
1112      EXTRA_UNDEF_REFERENCES,
1113      LOST_FAMILIES
1114    }
1115
1116    /**
1117     * Per reducer, we output problem rows as byte arrays so can be used as input for subsequent
1118     * investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag saying what
1119     * sort of emission it is. Flag is the Count enum ordinal as a short.
1120     */
1121    public static class VerifyReducer
1122      extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
1123      private ArrayList<byte[]> refs = new ArrayList<>();
1124      private final BytesWritable UNREF =
1125        new BytesWritable(addPrefixFlag(VerifyCounts.UNREFERENCED.ordinal(), new byte[] {}));
1126      private final BytesWritable LOSTFAM =
1127        new BytesWritable(addPrefixFlag(VerifyCounts.LOST_FAMILIES.ordinal(), new byte[] {}));
1128
1129      private AtomicInteger rows = new AtomicInteger(0);
1130      private Connection connection;
1131
1132      @Override
1133      protected void
1134        setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1135          throws IOException, InterruptedException {
1136        super.setup(context);
1137        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
1138      }
1139
1140      @Override
1141      protected void
1142        cleanup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1143          throws IOException, InterruptedException {
1144        if (this.connection != null) {
1145          this.connection.close();
1146        }
1147        super.cleanup(context);
1148      }
1149
1150      /**
1151       * nn * @return Return new byte array that has <code>ordinal</code> as prefix on front taking
1152       * up Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
1153       */
1154      public static byte[] addPrefixFlag(final int ordinal, final byte[] r) {
1155        byte[] prefix = Bytes.toBytes((short) ordinal);
1156        if (prefix.length != Bytes.SIZEOF_SHORT) {
1157          throw new RuntimeException("Unexpected size: " + prefix.length);
1158        }
1159        byte[] result = new byte[prefix.length + r.length];
1160        System.arraycopy(prefix, 0, result, 0, prefix.length);
1161        System.arraycopy(r, 0, result, prefix.length, r.length);
1162        return result;
1163      }
1164
1165      /**
1166       * n * @return Type from the Counts enum of this row. Reads prefix added by
1167       * {@link #addPrefixFlag(int, byte[])}
1168       */
1169      public static VerifyCounts whichType(final byte[] bs) {
1170        int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
1171        return VerifyCounts.values()[ordinal];
1172      }
1173
1174      /**
1175       * n * @return Row bytes minus the type flag.
1176       */
1177      public static byte[] getRowOnly(BytesWritable bw) {
1178        byte[] bytes = new byte[bw.getLength() - Bytes.SIZEOF_SHORT];
1179        System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
1180        return bytes;
1181      }
1182
1183      @Override
1184      public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
1185        throws IOException, InterruptedException {
1186        int defCount = 0;
1187        boolean lostFamilies = false;
1188        refs.clear();
1189        for (BytesWritable type : values) {
1190          if (type.getLength() == DEF.getLength()) {
1191            defCount++;
1192            if (type.getBytes()[0] == 1) {
1193              lostFamilies = true;
1194            }
1195          } else {
1196            byte[] bytes = new byte[type.getLength()];
1197            System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
1198            refs.add(bytes);
1199          }
1200        }
1201
1202        // TODO check for more than one def, should not happen
1203        StringBuilder refsSb = null;
1204        if (defCount == 0 || refs.size() != 1) {
1205          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1206          refsSb = dumpExtraInfoOnRefs(key, context, refs);
1207          LOG.error("LinkedListError: key=" + keyString + ", reference(s)="
1208            + (refsSb != null ? refsSb.toString() : ""));
1209        }
1210        if (lostFamilies) {
1211          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1212          LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
1213          context.getCounter(VerifyCounts.LOST_FAMILIES).increment(1);
1214          context.write(key, LOSTFAM);
1215        }
1216
1217        if (defCount == 0 && refs.size() > 0) {
1218          // This is bad, found a node that is referenced but not defined. It must have been
1219          // lost, emit some info about this node for debugging purposes.
1220          // Write out a line per reference. If more than one, flag it.;
1221          for (int i = 0; i < refs.size(); i++) {
1222            byte[] bs = refs.get(i);
1223            int ordinal;
1224            if (i <= 0) {
1225              ordinal = VerifyCounts.UNDEFINED.ordinal();
1226              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1227              context.getCounter(VerifyCounts.UNDEFINED).increment(1);
1228            } else {
1229              ordinal = VerifyCounts.EXTRA_UNDEF_REFERENCES.ordinal();
1230              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1231            }
1232          }
1233          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1234            // Print out missing row; doing get on reference gives info on when the referencer
1235            // was added which can help a little debugging. This info is only available in mapper
1236            // output -- the 'Linked List error Key...' log message above. What we emit here is
1237            // useless for debugging.
1238            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1239            context.getCounter("undef", keyString).increment(1);
1240          }
1241        } else if (defCount > 0 && refs.isEmpty()) {
1242          // node is defined but not referenced
1243          context.write(key, UNREF);
1244          context.getCounter(VerifyCounts.UNREFERENCED).increment(1);
1245          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1246            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1247            context.getCounter("unref", keyString).increment(1);
1248          }
1249        } else {
1250          if (refs.size() > 1) {
1251            // Skip first reference.
1252            for (int i = 1; i < refs.size(); i++) {
1253              context.write(key, new BytesWritable(
1254                addPrefixFlag(VerifyCounts.EXTRAREFERENCES.ordinal(), refs.get(i))));
1255            }
1256            context.getCounter(VerifyCounts.EXTRAREFERENCES).increment(refs.size() - 1);
1257          }
1258          // node is defined and referenced
1259          context.getCounter(VerifyCounts.REFERENCED).increment(1);
1260        }
1261      }
1262
1263      /**
1264       * Dump out extra info around references if there are any. Helps debugging.
1265       * @return StringBuilder filled with references if any. n
1266       */
1267      @SuppressWarnings("JavaUtilDate")
1268      private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1269        final List<byte[]> refs) throws IOException {
1270        StringBuilder refsSb = null;
1271        if (refs.isEmpty()) return refsSb;
1272        refsSb = new StringBuilder();
1273        String comma = "";
1274        // If a row is a reference but has no define, print the content of the row that has
1275        // this row as a 'prev'; it will help debug. The missing row was written just before
1276        // the row we are dumping out here.
1277        TableName tn = getTableName(context.getConfiguration());
1278        try (Table t = this.connection.getTable(tn)) {
1279          for (byte[] ref : refs) {
1280            Result r = t.get(new Get(ref));
1281            List<Cell> cells = r.listCells();
1282            String ts = (cells != null && !cells.isEmpty())
1283              ? new java.util.Date(cells.get(0).getTimestamp()).toString()
1284              : "";
1285            byte[] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1286            String jobStr = (b != null && b.length > 0) ? Bytes.toString(b) : "";
1287            b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1288            long count = (b != null && b.length > 0) ? Bytes.toLong(b) : -1;
1289            b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1290            String refRegionLocation = "";
1291            String keyRegionLocation = "";
1292            if (b != null && b.length > 0) {
1293              try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1294                HRegionLocation hrl = rl.getRegionLocation(b);
1295                if (hrl != null) refRegionLocation = hrl.toString();
1296                // Key here probably has trailing zeros on it.
1297                hrl = rl.getRegionLocation(key.getBytes());
1298                if (hrl != null) keyRegionLocation = hrl.toString();
1299              }
1300            }
1301            LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref)
1302              + ", refPrevEqualsKey="
1303              + (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0)
1304              + ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength())
1305              + ", ref row date=" + ts + ", jobStr=" + jobStr + ", ref row count=" + count
1306              + ", ref row regionLocation=" + refRegionLocation + ", key row regionLocation="
1307              + keyRegionLocation);
1308            refsSb.append(comma);
1309            comma = ",";
1310            refsSb.append(Bytes.toStringBinary(ref));
1311          }
1312        }
1313        return refsSb;
1314      }
1315    }
1316
1317    @Override
1318    public int run(String[] args) throws Exception {
1319      if (args.length != 2) {
1320        System.out
1321          .println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>");
1322        return 0;
1323      }
1324
1325      String outputDir = args[0];
1326      int numReducers = Integer.parseInt(args[1]);
1327
1328      return run(outputDir, numReducers);
1329    }
1330
1331    public int run(String outputDir, int numReducers) throws Exception {
1332      return run(new Path(outputDir), numReducers);
1333    }
1334
1335    public int run(Path outputDir, int numReducers) throws Exception {
1336      LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
1337
1338      job = Job.getInstance(getConf());
1339
1340      job.setJobName("Link Verifier");
1341      job.setNumReduceTasks(numReducers);
1342      job.setJarByClass(getClass());
1343
1344      setJobScannerConf(job);
1345
1346      Scan scan = new Scan();
1347      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1348      scan.setCaching(10000);
1349      scan.setCacheBlocks(false);
1350      if (isMultiUnevenColumnFamilies(getConf())) {
1351        scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1352        scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1353      }
1354
1355      TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1356        VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1357      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
1358        AbstractHBaseTool.class);
1359
1360      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1361
1362      job.setReducerClass(VerifyReducer.class);
1363      job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1364      job.setOutputKeyClass(BytesWritable.class);
1365      job.setOutputValueClass(BytesWritable.class);
1366      TextOutputFormat.setOutputPath(job, outputDir);
1367
1368      boolean success = job.waitForCompletion(true);
1369
1370      if (success) {
1371        Counters counters = job.getCounters();
1372        if (null == counters) {
1373          LOG.warn("Counters were null, cannot verify Job completion."
1374            + " This is commonly a result of insufficient YARN configuration.");
1375          // We don't have access to the counters to know if we have "bad" counts
1376          return 0;
1377        }
1378
1379        // If we find no unexpected values, the job didn't outright fail
1380        if (verifyUnexpectedValues(counters)) {
1381          // We didn't check referenced+unreferenced counts, leave that to visual inspection
1382          return 0;
1383        }
1384      }
1385
1386      // We failed
1387      return 1;
1388    }
1389
1390    public boolean verify(long expectedReferenced) throws Exception {
1391      if (job == null) {
1392        throw new IllegalStateException("You should call run() first");
1393      }
1394
1395      Counters counters = job.getCounters();
1396      if (counters == null) {
1397        LOG.info("Counters object was null, write verification cannot be performed."
1398          + " This is commonly a result of insufficient YARN configuration.");
1399        return false;
1400      }
1401
1402      // Run through each check, even if we fail one early
1403      boolean success = verifyExpectedValues(expectedReferenced, counters);
1404
1405      if (!verifyUnexpectedValues(counters)) {
1406        // We found counter objects which imply failure
1407        success = false;
1408      }
1409
1410      if (!success) {
1411        handleFailure(counters);
1412      }
1413      return success;
1414    }
1415
1416    /**
1417     * Verify the values in the Counters against the expected number of entries written. n *
1418     * Expected number of referenced entrires n * The Job's Counters object
1419     * @return True if the values match what's expected, false otherwise
1420     */
1421    protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1422      final Counter referenced = counters.findCounter(VerifyCounts.REFERENCED);
1423      final Counter unreferenced = counters.findCounter(VerifyCounts.UNREFERENCED);
1424      boolean success = true;
1425
1426      if (expectedReferenced != referenced.getValue()) {
1427        LOG.error("Expected referenced count does not match with actual referenced count. "
1428          + "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1429        success = false;
1430      }
1431
1432      if (unreferenced.getValue() > 0) {
1433        final Counter multiref = counters.findCounter(VerifyCounts.EXTRAREFERENCES);
1434        boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1435        LOG.error(
1436          "Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1437            + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1438        success = false;
1439      }
1440
1441      return success;
1442    }
1443
1444    /**
1445     * Verify that the Counters don't contain values which indicate an outright failure from the
1446     * Reducers. n * The Job's counters
1447     * @return True if the "bad" counter objects are 0, false otherwise
1448     */
1449    protected boolean verifyUnexpectedValues(Counters counters) {
1450      final Counter undefined = counters.findCounter(VerifyCounts.UNDEFINED);
1451      final Counter lostfamilies = counters.findCounter(VerifyCounts.LOST_FAMILIES);
1452      boolean success = true;
1453
1454      if (undefined.getValue() > 0) {
1455        LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1456        success = false;
1457      }
1458
1459      if (lostfamilies.getValue() > 0) {
1460        LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1461        success = false;
1462      }
1463
1464      return success;
1465    }
1466
1467    protected void handleFailure(Counters counters) throws IOException {
1468      Configuration conf = job.getConfiguration();
1469      TableName tableName = getTableName(conf);
1470      try (Connection conn = ConnectionFactory.createConnection(conf)) {
1471        try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1472          CounterGroup g = counters.getGroup("undef");
1473          Iterator<Counter> it = g.iterator();
1474          while (it.hasNext()) {
1475            String keyString = it.next().getName();
1476            byte[] key = Bytes.toBytes(keyString);
1477            HRegionLocation loc = rl.getRegionLocation(key, true);
1478            LOG.error("undefined row " + keyString + ", " + loc);
1479          }
1480          g = counters.getGroup("unref");
1481          it = g.iterator();
1482          while (it.hasNext()) {
1483            String keyString = it.next().getName();
1484            byte[] key = Bytes.toBytes(keyString);
1485            HRegionLocation loc = rl.getRegionLocation(key, true);
1486            LOG.error("unreferred row " + keyString + ", " + loc);
1487          }
1488        }
1489      }
1490    }
1491  }
1492
1493  /**
1494   * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
1495   * adds more data.
1496   */
1497  static class Loop extends Configured implements Tool {
1498
1499    private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
1500    private static final String USAGE = "Usage: Loop <num iterations> <num mappers> "
1501      + "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>"
1502      + " <num walker threads>] \n"
1503      + "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n"
1504      + "walkers will select and verify random flushed loop during Generation.";
1505
1506    IntegrationTestBigLinkedList it;
1507
1508    protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
1509      Integer wrapMultiplier, Integer numWalkers) throws Exception {
1510      Path outputPath = new Path(outputDir);
1511      UUID uuid = UUID.randomUUID(); // create a random UUID.
1512      Path generatorOutput = new Path(outputPath, uuid.toString());
1513
1514      Generator generator = new Generator();
1515      generator.setConf(getConf());
1516      int retCode =
1517        generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers);
1518      if (retCode > 0) {
1519        throw new RuntimeException("Generator failed with return code: " + retCode);
1520      }
1521      if (numWalkers > 0) {
1522        if (!generator.verify()) {
1523          throw new RuntimeException("Generator.verify failed");
1524        }
1525      }
1526      LOG.info("Generator finished with success. Total nodes=" + numNodes);
1527    }
1528
1529    protected void runVerify(String outputDir, int numReducers, long expectedNumNodes)
1530      throws Exception {
1531      Path outputPath = new Path(outputDir);
1532      UUID uuid = UUID.randomUUID(); // create a random UUID.
1533      Path iterationOutput = new Path(outputPath, uuid.toString());
1534
1535      Verify verify = new Verify();
1536      verify.setConf(getConf());
1537      int retCode = verify.run(iterationOutput, numReducers);
1538      if (retCode > 0) {
1539        throw new RuntimeException("Verify.run failed with return code: " + retCode);
1540      }
1541
1542      if (!verify.verify(expectedNumNodes)) {
1543        throw new RuntimeException("Verify.verify failed");
1544      }
1545      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
1546    }
1547
1548    @Override
1549    public int run(String[] args) throws Exception {
1550      if (args.length < 5) {
1551        System.err.println(USAGE);
1552        return 1;
1553      }
1554      try {
1555        int numIterations = Integer.parseInt(args[0]);
1556        int numMappers = Integer.parseInt(args[1]);
1557        long numNodes = Long.parseLong(args[2]);
1558        String outputDir = args[3];
1559        int numReducers = Integer.parseInt(args[4]);
1560        Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1561        Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1562        Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]);
1563
1564        long expectedNumNodes = 0;
1565
1566        if (numIterations < 0) {
1567          numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
1568        }
1569        LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1570        for (int i = 0; i < numIterations; i++) {
1571          LOG.info("Starting iteration = " + i);
1572          runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers);
1573          expectedNumNodes += numMappers * numNodes;
1574          runVerify(outputDir, numReducers, expectedNumNodes);
1575        }
1576        return 0;
1577      } catch (NumberFormatException e) {
1578        System.err.println("Parsing loop arguments failed: " + e.getMessage());
1579        System.err.println(USAGE);
1580        return 1;
1581      }
1582    }
1583  }
1584
1585  /**
1586   * A stand alone program that prints out portions of a list created by {@link Generator}
1587   */
1588  private static class Print extends Configured implements Tool {
1589    @Override
1590    public int run(String[] args) throws Exception {
1591      Options options = new Options();
1592      options.addOption("s", "start", true, "start key");
1593      options.addOption("e", "end", true, "end key");
1594      options.addOption("l", "limit", true, "number to print");
1595
1596      GnuParser parser = new GnuParser();
1597      CommandLine cmd = null;
1598      try {
1599        cmd = parser.parse(options, args);
1600        if (cmd.getArgs().length != 0) {
1601          throw new ParseException("Command takes no arguments");
1602        }
1603      } catch (ParseException e) {
1604        System.err.println("Failed to parse command line " + e.getMessage());
1605        System.err.println();
1606        HelpFormatter formatter = new HelpFormatter();
1607        formatter.printHelp(getClass().getSimpleName(), options);
1608        System.exit(-1);
1609      }
1610
1611      Connection connection = ConnectionFactory.createConnection(getConf());
1612      Table table = connection.getTable(getTableName(getConf()));
1613
1614      Scan scan = new Scan();
1615      scan.setBatch(10000);
1616
1617      if (cmd.hasOption("s")) scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1618
1619      if (cmd.hasOption("e")) scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1620
1621      int limit = 0;
1622      if (cmd.hasOption("l")) limit = Integer.parseInt(cmd.getOptionValue("l"));
1623      else limit = 100;
1624
1625      ResultScanner scanner = table.getScanner(scan);
1626
1627      CINode node = new CINode();
1628      Result result = scanner.next();
1629      int count = 0;
1630      while (result != null && count++ < limit) {
1631        node = getCINode(result, node);
1632        System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1633          Bytes.toStringBinary(node.prev), node.count, node.client);
1634        result = scanner.next();
1635      }
1636      scanner.close();
1637      table.close();
1638      connection.close();
1639
1640      return 0;
1641    }
1642  }
1643
1644  /**
1645   * A stand alone program that deletes a single node.
1646   */
1647  private static class Delete extends Configured implements Tool {
1648    @Override
1649    public int run(String[] args) throws Exception {
1650      if (args.length != 1) {
1651        System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1652        return 0;
1653      }
1654      byte[] val = Bytes.toBytesBinary(args[0]);
1655
1656      org.apache.hadoop.hbase.client.Delete delete = new org.apache.hadoop.hbase.client.Delete(val);
1657
1658      try (Connection connection = ConnectionFactory.createConnection(getConf());
1659        Table table = connection.getTable(getTableName(getConf()))) {
1660        table.delete(delete);
1661      }
1662
1663      System.out.println("Delete successful");
1664      return 0;
1665    }
1666  }
1667
1668  abstract static class WalkerBase extends Configured {
1669    protected static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1670      Scan scan = new Scan();
1671      scan.setStartRow(startKey);
1672      scan.setBatch(1);
1673      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1674
1675      long t1 = EnvironmentEdgeManager.currentTime();
1676      ResultScanner scanner = table.getScanner(scan);
1677      Result result = scanner.next();
1678      long t2 = EnvironmentEdgeManager.currentTime();
1679      scanner.close();
1680
1681      if (result != null) {
1682        CINode node = getCINode(result, new CINode());
1683        System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1684        return node;
1685      }
1686
1687      System.out.println("FSR " + (t2 - t1));
1688
1689      return null;
1690    }
1691
1692    protected CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1693      Get get = new Get(row);
1694      get.addColumn(FAMILY_NAME, COLUMN_PREV);
1695      Result result = table.get(get);
1696      return getCINode(result, node);
1697    }
1698  }
1699
1700  /**
1701   * A stand alone program that follows a linked list created by {@link Generator} and prints timing
1702   * info.
1703   */
1704  private static class Walker extends WalkerBase implements Tool {
1705
1706    public Walker() {
1707    }
1708
1709    @Override
1710    public int run(String[] args) throws IOException {
1711
1712      Options options = new Options();
1713      options.addOption("n", "num", true, "number of queries");
1714      options.addOption("s", "start", true, "key to start at, binary string");
1715      options.addOption("l", "logevery", true, "log every N queries");
1716
1717      GnuParser parser = new GnuParser();
1718      CommandLine cmd = null;
1719      try {
1720        cmd = parser.parse(options, args);
1721        if (cmd.getArgs().length != 0) {
1722          throw new ParseException("Command takes no arguments");
1723        }
1724      } catch (ParseException e) {
1725        System.err.println("Failed to parse command line " + e.getMessage());
1726        System.err.println();
1727        HelpFormatter formatter = new HelpFormatter();
1728        formatter.printHelp(getClass().getSimpleName(), options);
1729        System.exit(-1);
1730      }
1731
1732      long maxQueries = Long.MAX_VALUE;
1733      if (cmd.hasOption('n')) {
1734        maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1735      }
1736      boolean isSpecificStart = cmd.hasOption('s');
1737
1738      byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1739      int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1740
1741      Connection connection = ConnectionFactory.createConnection(getConf());
1742      Table table = connection.getTable(getTableName(getConf()));
1743      long numQueries = 0;
1744      // If isSpecificStart is set, only walk one list from that particular node.
1745      // Note that in case of circular (or P-shaped) list it will walk forever, as is
1746      // the case in normal run without startKey.
1747      while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1748        if (!isSpecificStart) {
1749          startKey = new byte[ROWKEY_LENGTH];
1750          Bytes.random(startKey);
1751        }
1752        CINode node = findStartNode(table, startKey);
1753        if (node == null && isSpecificStart) {
1754          System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1755        }
1756        numQueries++;
1757        while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
1758          byte[] prev = node.prev;
1759          long t1 = EnvironmentEdgeManager.currentTime();
1760          node = getNode(prev, table, node);
1761          long t2 = EnvironmentEdgeManager.currentTime();
1762          if (logEvery > 0 && numQueries % logEvery == 0) {
1763            System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1764          }
1765          numQueries++;
1766          if (node == null) {
1767            System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1768          } else if (node.prev.length == NO_KEY.length) {
1769            System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1770          }
1771        }
1772      }
1773      table.close();
1774      connection.close();
1775      return 0;
1776    }
1777  }
1778
1779  private static class Clean extends Configured implements Tool {
1780    @Override
1781    public int run(String[] args) throws Exception {
1782      if (args.length < 1) {
1783        System.err.println("Usage: Clean <output dir>");
1784        return -1;
1785      }
1786
1787      Path p = new Path(args[0]);
1788      Configuration conf = getConf();
1789      TableName tableName = getTableName(conf);
1790      try (FileSystem fs = HFileSystem.get(conf);
1791        Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) {
1792        if (admin.tableExists(tableName)) {
1793          admin.disableTable(tableName);
1794          admin.deleteTable(tableName);
1795        }
1796
1797        if (fs.exists(p)) {
1798          fs.delete(p, true);
1799        }
1800      }
1801
1802      return 0;
1803    }
1804  }
1805
1806  static TableName getTableName(Configuration conf) {
1807    return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1808  }
1809
1810  private static CINode getCINode(Result result, CINode node) {
1811    node.key = Bytes.copy(result.getRow());
1812    if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1813      node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1814    } else {
1815      node.prev = NO_KEY;
1816    }
1817    if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1818      node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1819    } else {
1820      node.count = -1;
1821    }
1822    if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1823      node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1824    } else {
1825      node.client = "";
1826    }
1827    return node;
1828  }
1829
1830  @Override
1831  public void setUpCluster() throws Exception {
1832    util = getTestingUtil(getConf());
1833    boolean isDistributed = util.isDistributedCluster();
1834    util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1835    if (!isDistributed) {
1836      util.startMiniMapReduceCluster();
1837    }
1838    this.setConf(util.getConfiguration());
1839  }
1840
1841  @Override
1842  public void cleanUpCluster() throws Exception {
1843    super.cleanUpCluster();
1844    if (util.isDistributedCluster()) {
1845      util.shutdownMiniMapReduceCluster();
1846    }
1847  }
1848
1849  private static boolean isMultiUnevenColumnFamilies(Configuration conf) {
1850    return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, true);
1851  }
1852
1853  @Test
1854  public void testContinuousIngest() throws IOException, Exception {
1855    // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1856    Configuration conf = getTestingUtil(getConf()).getConfiguration();
1857    if (isMultiUnevenColumnFamilies(getConf())) {
1858      // make sure per CF flush is on
1859      conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
1860        FlushAllLargeStoresPolicy.class.getName());
1861    }
1862    int ret = ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1863      util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1864    org.junit.Assert.assertEquals(0, ret);
1865  }
1866
1867  private void usage() {
1868    System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1869    printCommands();
1870  }
1871
1872  private void printCommands() {
1873    System.err.println("Commands:");
1874    System.err.println(" generator  Map only job that generates data.");
1875    System.err.println(" verify     A map reduce job that looks for holes. Check return code and");
1876    System.err.println("            look at the counts after running. See REFERENCED and");
1877    System.err.println("            UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1878    System.err.println("            with the Generator.");
1879    System.err.println(" walker     "
1880      + "Standalone program that starts following a linked list & emits timing info.");
1881    System.err.println(" print      Standalone program that prints nodes in the linked list.");
1882    System.err.println(" delete     Standalone program that deletes a single node.");
1883    System.err.println(" loop       Program to Loop through Generator and Verify steps");
1884    System.err.println(" clean      Program to clean all left over detritus.");
1885    System.err.println(" search     Search for missing keys.");
1886    System.err.println("");
1887    System.err.println("General options:");
1888    System.err.println(" -D" + TABLE_NAME_KEY + "=<tableName>");
1889    System.err.println(
1890      "    Run using the <tableName> as the tablename.  Defaults to " + DEFAULT_TABLE_NAME);
1891    System.err.println(" -D" + HBaseTestingUtility.REGIONS_PER_SERVER_KEY + "=<# regions>");
1892    System.err.println("    Create table with presplit regions per server.  Defaults to "
1893      + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
1894
1895    System.err.println(" -DuseMob=<true|false>");
1896    System.err.println(
1897      "    Create table so that the mob read/write path is forced.  " + "Defaults to false");
1898
1899    System.err.flush();
1900  }
1901
1902  @Override
1903  protected void processOptions(CommandLine cmd) {
1904    super.processOptions(cmd);
1905    String[] args = cmd.getArgs();
1906    // get the class, run with the conf
1907    if (args.length < 1) {
1908      printUsage(this.getClass().getSimpleName() + " <general options> COMMAND [<COMMAND options>]",
1909        "General options:", "");
1910      printCommands();
1911      // Have to throw an exception here to stop the processing. Looks ugly but gets message across.
1912      throw new RuntimeException("Incorrect Number of args.");
1913    }
1914    toRun = args[0];
1915    otherArgs = Arrays.copyOfRange(args, 1, args.length);
1916  }
1917
1918  @Override
1919  public int runTestFromCommandLine() throws Exception {
1920    Tool tool = null;
1921    if (toRun.equalsIgnoreCase("Generator")) {
1922      tool = new Generator();
1923    } else if (toRun.equalsIgnoreCase("Verify")) {
1924      tool = new Verify();
1925    } else if (toRun.equalsIgnoreCase("Loop")) {
1926      Loop loop = new Loop();
1927      loop.it = this;
1928      tool = loop;
1929    } else if (toRun.equalsIgnoreCase("Walker")) {
1930      tool = new Walker();
1931    } else if (toRun.equalsIgnoreCase("Print")) {
1932      tool = new Print();
1933    } else if (toRun.equalsIgnoreCase("Delete")) {
1934      tool = new Delete();
1935    } else if (toRun.equalsIgnoreCase("Clean")) {
1936      tool = new Clean();
1937    } else if (toRun.equalsIgnoreCase("Search")) {
1938      tool = new Search();
1939    } else {
1940      usage();
1941      throw new RuntimeException("Unknown arg");
1942    }
1943
1944    return ToolRunner.run(getConf(), tool, otherArgs);
1945  }
1946
1947  @Override
1948  public TableName getTablename() {
1949    Configuration c = getConf();
1950    return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1951  }
1952
1953  @Override
1954  protected Set<String> getColumnFamilies() {
1955    if (isMultiUnevenColumnFamilies(getConf())) {
1956      return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
1957        Bytes.toString(TINY_FAMILY_NAME));
1958    } else {
1959      return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1960    }
1961  }
1962
1963  private static void setJobConf(Job job, int numMappers, long numNodes, Integer width,
1964    Integer wrapMultiplier, Integer numWalkers) {
1965    job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1966    job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1967    if (width != null) {
1968      job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1969    }
1970    if (wrapMultiplier != null) {
1971      job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1972    }
1973    if (numWalkers != null) {
1974      job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers);
1975    }
1976  }
1977
1978  public static void setJobScannerConf(Job job) {
1979    // Make sure scanners log something useful to make debugging possible.
1980    job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
1981    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1982  }
1983
1984  public static void main(String[] args) throws Exception {
1985    Configuration conf = HBaseConfiguration.create();
1986    IntegrationTestingUtility.setUseDistributedCluster(conf);
1987    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1988    System.exit(ret);
1989  }
1990}