001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.security.SecureRandom;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Random;
031import java.util.Set;
032import java.util.SortedSet;
033import java.util.TreeSet;
034import java.util.UUID;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.atomic.AtomicInteger;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.conf.Configured;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.LocatedFileStatus;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.fs.RemoteIterator;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.IntegrationTestBase;
049import org.apache.hadoop.hbase.IntegrationTestingUtility;
050import org.apache.hadoop.hbase.MasterNotRunningException;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.Admin;
053import org.apache.hadoop.hbase.client.BufferedMutator;
054import org.apache.hadoop.hbase.client.BufferedMutatorParams;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Connection;
057import org.apache.hadoop.hbase.client.ConnectionConfiguration;
058import org.apache.hadoop.hbase.client.ConnectionFactory;
059import org.apache.hadoop.hbase.client.Get;
060import org.apache.hadoop.hbase.client.Mutation;
061import org.apache.hadoop.hbase.client.Put;
062import org.apache.hadoop.hbase.client.RegionLocator;
063import org.apache.hadoop.hbase.client.Result;
064import org.apache.hadoop.hbase.client.ResultScanner;
065import org.apache.hadoop.hbase.client.Scan;
066import org.apache.hadoop.hbase.client.Table;
067import org.apache.hadoop.hbase.client.TableDescriptor;
068import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
069import org.apache.hadoop.hbase.fs.HFileSystem;
070import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
071import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
072import org.apache.hadoop.hbase.mapreduce.TableMapper;
073import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
074import org.apache.hadoop.hbase.mapreduce.WALPlayer;
075import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy;
076import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
077import org.apache.hadoop.hbase.testclassification.IntegrationTests;
078import org.apache.hadoop.hbase.util.AbstractHBaseTool;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.CommonFSUtils;
081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
082import org.apache.hadoop.hbase.util.Random64;
083import org.apache.hadoop.hbase.util.RegionSplitter;
084import org.apache.hadoop.hbase.wal.WALEdit;
085import org.apache.hadoop.hbase.wal.WALKey;
086import org.apache.hadoop.io.BytesWritable;
087import org.apache.hadoop.io.NullWritable;
088import org.apache.hadoop.io.Writable;
089import org.apache.hadoop.mapreduce.Counter;
090import org.apache.hadoop.mapreduce.CounterGroup;
091import org.apache.hadoop.mapreduce.Counters;
092import org.apache.hadoop.mapreduce.InputFormat;
093import org.apache.hadoop.mapreduce.InputSplit;
094import org.apache.hadoop.mapreduce.Job;
095import org.apache.hadoop.mapreduce.JobContext;
096import org.apache.hadoop.mapreduce.Mapper;
097import org.apache.hadoop.mapreduce.RecordReader;
098import org.apache.hadoop.mapreduce.Reducer;
099import org.apache.hadoop.mapreduce.TaskAttemptContext;
100import org.apache.hadoop.mapreduce.TaskAttemptID;
101import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
102import org.apache.hadoop.mapreduce.lib.input.FileSplit;
103import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
104import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
105import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
106import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
107import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
108import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
109import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
110import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
111import org.apache.hadoop.util.Tool;
112import org.apache.hadoop.util.ToolRunner;
113import org.junit.Test;
114import org.junit.experimental.categories.Category;
115import org.slf4j.Logger;
116import org.slf4j.LoggerFactory;
117
118import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
119import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
120import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
121import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
122import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
123import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
124import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
125
126/**
127 * <p>
128 * This is an integration test borrowed from goraci, written by Keith Turner,
129 * which is in turn inspired by the Accumulo test called continous ingest (ci).
130 * The original source code can be found here:
131 * <ul>
132 *   <li>https://github.com/keith-turner/goraci</li>
133 *   <li>https://github.com/enis/goraci/</li>
134 * </ul>
135 * </p>
136 * <p>
137 * Apache Accumulo [0] has a simple test suite that verifies that data is not
138 * lost at scale. This test suite is called continuous ingest. This test runs
139 * many ingest clients that continually create linked lists containing 25
140 * million nodes. At some point the clients are stopped and a map reduce job is
141 * run to ensure no linked list has a hole. A hole indicates data was lost.
142 * </p>
143 * <p>
144 * The nodes in the linked list are random. This causes each linked list to
145 * spread across the table. Therefore if one part of a table loses data, then it
146 * will be detected by references in another part of the table.
147 * </p>
148 * <p>
149 * <h3>THE ANATOMY OF THE TEST</h3>
150 *
151 * Below is rough sketch of how data is written. For specific details look at
152 * the Generator code.
153 * </p>
154 * <p>
155 * <ol>
156 * <li>Write out 1 million nodes (1M is the configurable 'width' mentioned below)</li>
157 * <li>Flush the client</li>
158 * <li>Write out 1 million that reference previous million</li>
159 * <li>If this is the 25th set of 1 million nodes, then update 1st set of
160 * million to point to last (25 is configurable; its the 'wrap multiplier' referred to below)</li>
161 * <li>goto 1</li>
162 * </ol>
163 * </p>
164 * <p>
165 * The key is that nodes only reference flushed nodes. Therefore a node should
166 * never reference a missing node, even if the ingest client is killed at any
167 * point in time.
168 * </p>
169 * <p>
170 * When running this test suite w/ Accumulo there is a script running in
171 * parallel called the Aggitator that randomly and continuously kills server
172 * processes. The outcome was that many data loss bugs were found in Accumulo
173 * by doing this. This test suite can also help find bugs that impact uptime
174 * and stability when run for days or weeks.
175 * </p>
176 * <p>
177 * This test suite consists the following
178 * <ul>
179 * <li>a few Java programs</li>
180 * <li>a little helper script to run the java programs</li>
181 * <li>a maven script to build it</li>
182 * </ul>
183 * </p>
184 * <p>
185 * When generating data, its best to have each map task generate a multiple of
186 * 25 million. The reason for this is that circular linked list are generated
187 * every 25M. Not generating a multiple in 25M will result in some nodes in the
188 * linked list not having references. The loss of an unreferenced node can not
189 * be detected.
190 * </p>
191 * <p>
192 * <h3>Below is a description of the Java programs</h3>
193 * <ul>
194 * <li>
195 * {@code Generator} - A map only job that generates data. As stated previously, its best to
196 * generate data in multiples of 25M. An option is also available to allow concurrent walkers to
197 * select and walk random flushed loops during this phase.
198 * </li>
199 * <li>
200 * {@code Verify} - A map reduce job that looks for holes. Look at the counts after running.
201 * {@code REFERENCED} and {@code UNREFERENCED} are ok, any {@code UNDEFINED} counts are bad. Do not
202 * run at the same time as the Generator.
203 * </li>
204 * <li>
205 * {@code Walker} - A standalone program that start following a linked list and emits timing info.
206 * </li>
207 * <li>
208 * {@code Print} - A standalone program that prints nodes in the linked list
209 * </li>
210 * <li>
211 * {@code Delete} - A standalone program that deletes a single node
212 * </li>
213 * </ul>
214 *
215 * This class can be run as a unit test, as an integration test, or from the command line
216 * </p>
217 * <p>
218 * ex:
219 * <pre>
220 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
221 *    loop 2 1 100000 /temp 1 1000 50 1 0
222 * </pre>
223 * </p>
224 */
225@Category(IntegrationTests.class)
226public class IntegrationTestBigLinkedList extends IntegrationTestBase {
227  protected static final byte[] NO_KEY = new byte[1];
228  protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
229  protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
230  protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
231  private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
232  private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
233
234  //link to the id of the prev node in the linked list
235  protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
236
237  //identifier of the mapred task that generated this row
238  protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
239
240  //the id of the row within the same client.
241  protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
242
243  /** How many rows to write per map task. This has to be a multiple of 25M */
244  private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
245    = "IntegrationTestBigLinkedList.generator.num_rows";
246
247  private static final String GENERATOR_NUM_MAPPERS_KEY
248    = "IntegrationTestBigLinkedList.generator.map.tasks";
249
250  private static final String GENERATOR_WIDTH_KEY
251    = "IntegrationTestBigLinkedList.generator.width";
252
253  private static final String GENERATOR_WRAP_KEY
254    = "IntegrationTestBigLinkedList.generator.wrap";
255
256  private static final String CONCURRENT_WALKER_KEY
257    = "IntegrationTestBigLinkedList.generator.concurrentwalkers";
258
259  protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
260
261  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
262
263  private static final int WIDTH_DEFAULT = 1000000;
264
265  /**
266   * The 'wrap multipler' default.
267   */
268  private static final int WRAP_DEFAULT = 25;
269  private static final int ROWKEY_LENGTH = 16;
270
271  private static final int CONCURRENT_WALKER_DEFAULT = 0;
272
273  protected String toRun;
274  protected String[] otherArgs;
275
276  static class CINode {
277    byte[] key;
278    byte[] prev;
279    String client;
280    long count;
281  }
282
283  /**
284   * A Map only job that generates random linked list and stores them.
285   */
286  static class Generator extends Configured implements Tool {
287    private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
288
289    /**
290     * Set this configuration if you want to test single-column family flush works. If set, we will
291     * add a big column family and a small column family on either side of the usual ITBLL 'meta'
292     * column family. When we write out the ITBLL, we will also add to the big column family a value
293     * bigger than that for ITBLL and for small, something way smaller. The idea is that when
294     * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
295     * way. Here is how you would pass it:
296     * <p>
297     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
298     * -Dgenerator.multiple.columnfamilies=true generator 1 10 g
299     */
300    public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
301        "generator.multiple.columnfamilies";
302
303    /**
304     * Set this configuration if you want to scale up the size of test data quickly.
305     * <p>
306     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
307     * -Dgenerator.big.family.value.size=1024 generator 1 10 output
308     */
309    public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size";
310
311    public static enum Counts {
312      SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION
313    }
314
315    public static final String USAGE =  "Usage : " + Generator.class.getSimpleName() +
316      " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" +
317      " <num walker threads>] \n" +
318      "Where <num nodes per map> should be a multiple of 'width' * 'wrap multiplier'.\n" +
319      "25M is default because default 'width' is 1M and default 'wrap multiplier' is 25.\n" +
320      "We write out 1M nodes and then flush the client. After 25 flushes, we connect \n" +
321      "first written nodes back to the 25th set.\n" +
322      "Walkers verify random flushed loops during Generation.";
323
324    public Job job;
325
326    static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
327      static class GeneratorInputSplit extends InputSplit implements Writable {
328        @Override
329        public long getLength() throws IOException, InterruptedException {
330          return 1;
331        }
332        @Override
333        public String[] getLocations() throws IOException, InterruptedException {
334          return new String[0];
335        }
336        @Override
337        public void readFields(DataInput arg0) throws IOException {
338        }
339        @Override
340        public void write(DataOutput arg0) throws IOException {
341        }
342      }
343
344      static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
345        private long count;
346        private long numNodes;
347        private Random64 rand;
348
349        @Override
350        public void close() throws IOException {
351        }
352
353        @Override
354        public BytesWritable getCurrentKey() throws IOException, InterruptedException {
355          byte[] bytes = new byte[ROWKEY_LENGTH];
356          rand.nextBytes(bytes);
357          return new BytesWritable(bytes);
358        }
359
360        @Override
361        public NullWritable getCurrentValue() throws IOException, InterruptedException {
362          return NullWritable.get();
363        }
364
365        @Override
366        public float getProgress() throws IOException, InterruptedException {
367          return (float)(count / (double)numNodes);
368        }
369
370        @Override
371        public void initialize(InputSplit arg0, TaskAttemptContext context)
372            throws IOException, InterruptedException {
373          numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
374          // Use Random64 to avoid issue described in HBASE-21256.
375          rand = new Random64();
376        }
377
378        @Override
379        public boolean nextKeyValue() throws IOException, InterruptedException {
380          return count++ < numNodes;
381        }
382
383      }
384
385      @Override
386      public RecordReader<BytesWritable,NullWritable> createRecordReader(
387          InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
388        GeneratorRecordReader rr = new GeneratorRecordReader();
389        rr.initialize(split, context);
390        return rr;
391      }
392
393      @Override
394      public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
395        int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
396
397        ArrayList<InputSplit> splits = new ArrayList<>(numMappers);
398
399        for (int i = 0; i < numMappers; i++) {
400          splits.add(new GeneratorInputSplit());
401        }
402
403        return splits;
404      }
405    }
406
407    /** Ensure output files from prev-job go to map inputs for current job */
408    static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
409      @Override
410      protected boolean isSplitable(JobContext context, Path filename) {
411        return false;
412      }
413    }
414
415    /**
416     * Some ASCII art time:
417     * <p>
418     * [ . . . ] represents one batch of random longs of length WIDTH
419     * <pre>
420     *                _________________________
421     *               |                  ______ |
422     *               |                 |      ||
423     *             .-+-----------------+-----.||
424     *             | |                 |     |||
425     * first   = [ . . . . . . . . . . . ]   |||
426     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
427     *             | | | | | | | | | | |     |||
428     * prev    = [ . . . . . . . . . . . ]   |||
429     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
430     *             | | | | | | | | | | |     |||
431     * current = [ . . . . . . . . . . . ]   |||
432     *                                       |||
433     * ...                                   |||
434     *                                       |||
435     * last    = [ . . . . . . . . . . . ]   |||
436     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
437     *             |                 |________||
438     *             |___________________________|
439     * </pre>
440     */
441
442    static class GeneratorMapper
443      extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
444
445      byte[][] first = null;
446      byte[][] prev = null;
447      byte[][] current = null;
448      byte[] id;
449      long count = 0;
450      int i;
451      BufferedMutator mutator;
452      Connection connection;
453      long numNodes;
454      long wrap;
455      int width;
456      boolean multipleUnevenColumnFamilies;
457      byte[] tinyValue = new byte[] { 't' };
458      byte[] bigValue = null;
459      Configuration conf;
460
461      volatile boolean walkersStop;
462      int numWalkers;
463      volatile List<Long> flushedLoops = new ArrayList<>();
464      List<Thread> walkers = new ArrayList<>();
465
466      @Override
467      protected void setup(Context context) throws IOException, InterruptedException {
468        id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
469        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
470        instantiateHTable();
471        this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
472        current = new byte[this.width][];
473        int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
474        this.wrap = (long)wrapMultiplier * width;
475        this.numNodes = context.getConfiguration().getLong(
476            GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
477        if (this.numNodes < this.wrap) {
478          this.wrap = this.numNodes;
479        }
480        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
481        this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
482        this.walkersStop = false;
483        this.conf = context.getConfiguration();
484
485        if (multipleUnevenColumnFamilies) {
486          int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256);
487          int limit = context.getConfiguration().getInt(
488            ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
489            ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT);
490
491          Preconditions.checkArgument(
492            n <= limit,
493            "%s(%s) > %s(%s)",
494            BIG_FAMILY_VALUE_SIZE_KEY, n, ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit);
495
496          bigValue = new byte[n];
497          ThreadLocalRandom.current().nextBytes(bigValue);
498          LOG.info("Create a bigValue with " + n + " bytes.");
499        }
500
501        Preconditions.checkArgument(
502          numNodes > 0,
503          "numNodes(%s) <= 0",
504          numNodes);
505        Preconditions.checkArgument(
506          numNodes % width == 0,
507          "numNodes(%s) mod width(%s) != 0",
508          numNodes, width);
509        Preconditions.checkArgument(
510          numNodes % wrap == 0,
511          "numNodes(%s) mod wrap(%s) != 0",
512          numNodes, wrap
513        );
514      }
515
516      protected void instantiateHTable() throws IOException {
517        mutator = connection.getBufferedMutator(
518            new BufferedMutatorParams(getTableName(connection.getConfiguration()))
519                .writeBufferSize(4 * 1024 * 1024));
520      }
521
522      @Override
523      protected void cleanup(Context context) throws IOException ,InterruptedException {
524        joinWalkers();
525        mutator.close();
526        connection.close();
527      }
528
529      @Override
530      protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
531        current[i] = new byte[key.getLength()];
532        System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
533        if (++i == current.length) {
534          LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=",
535            current.length, count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i);
536          persist(output, count, prev, current, id);
537          i = 0;
538
539          if (first == null) {
540            first = current;
541          }
542          prev = current;
543          current = new byte[this.width][];
544
545          count += current.length;
546          output.setStatus("Count " + count);
547
548          if (count % wrap == 0) {
549            // this block of code turns the 1 million linked list of length 25 into one giant
550            //circular linked list of 25 million
551            circularLeftShift(first);
552            persist(output, -1, prev, first, null);
553            // At this point the entire loop has been flushed so we can add one of its nodes to the
554            // concurrent walker
555            if (numWalkers > 0) {
556              addFlushed(key.getBytes());
557              if (walkers.isEmpty()) {
558                startWalkers(numWalkers, conf, output);
559              }
560            }
561            first = null;
562            prev = null;
563          }
564        }
565      }
566
567      private static <T> void circularLeftShift(T[] first) {
568        T ez = first[0];
569        System.arraycopy(first, 1, first, 0, first.length - 1);
570        first[first.length - 1] = ez;
571      }
572
573      private void addFlushed(byte[] rowKey) {
574        synchronized (flushedLoops) {
575          flushedLoops.add(Bytes.toLong(rowKey));
576          flushedLoops.notifyAll();
577        }
578      }
579
580      protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
581          throws IOException {
582        for (int i = 0; i < current.length; i++) {
583
584          if (i % 100 == 0) {
585            // Tickle progress every so often else maprunner will think us hung
586            output.progress();
587          }
588
589          Put put = new Put(current[i]);
590          put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
591
592          if (count >= 0) {
593            put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
594          }
595          if (id != null) {
596            put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
597          }
598          // See if we are to write multiple columns.
599          if (this.multipleUnevenColumnFamilies) {
600            // Use any column name.
601            put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
602            // Use any column name.
603            put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
604          }
605          mutator.mutate(put);
606        }
607
608        mutator.flush();
609      }
610
611      private void startWalkers(int numWalkers, Configuration conf, Context context) {
612        LOG.info("Starting " + numWalkers + " concurrent walkers");
613        for (int i = 0; i < numWalkers; i++) {
614          Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context));
615          walker.start();
616          walkers.add(walker);
617        }
618      }
619
620      private void joinWalkers() {
621        walkersStop = true;
622        synchronized (flushedLoops) {
623          flushedLoops.notifyAll();
624        }
625        for (Thread walker : walkers) {
626          try {
627            walker.join();
628          } catch (InterruptedException e) {
629            // no-op
630          }
631        }
632      }
633
634      /**
635       * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by
636       * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are
637       * configured to only log erroneous nodes.
638       */
639
640      public class ContinuousConcurrentWalker implements Runnable {
641
642        ConcurrentWalker walker;
643        Configuration conf;
644        Context context;
645        Random rand;
646
647        public ContinuousConcurrentWalker(Configuration conf, Context context) {
648          this.conf = conf;
649          this.context = context;
650          rand = new Random();
651        }
652
653        @Override
654        public void run() {
655          while (!walkersStop) {
656            try {
657              long node = selectLoop();
658              try {
659                walkLoop(node);
660              } catch (IOException e) {
661                context.getCounter(Counts.IOEXCEPTION).increment(1l);
662                return;
663              }
664            } catch (InterruptedException e) {
665              return;
666            }
667          }
668        }
669
670        private void walkLoop(long node) throws IOException {
671          walker = new ConcurrentWalker(context);
672          walker.setConf(conf);
673          walker.run(node, wrap);
674        }
675
676        private long selectLoop () throws InterruptedException{
677          synchronized (flushedLoops) {
678            while (flushedLoops.isEmpty() && !walkersStop) {
679              flushedLoops.wait();
680            }
681            if (walkersStop) {
682              throw new InterruptedException();
683            }
684            return flushedLoops.get(rand.nextInt(flushedLoops.size()));
685          }
686        }
687      }
688
689      public static class ConcurrentWalker extends WalkerBase {
690
691        Context context;
692
693        public ConcurrentWalker(Context context) {this.context = context;}
694
695        public void run(long startKeyIn, long maxQueriesIn) throws IOException {
696
697          long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
698          byte[] startKey = Bytes.toBytes(startKeyIn);
699
700          Connection connection = ConnectionFactory.createConnection(getConf());
701          Table table = connection.getTable(getTableName(getConf()));
702          long numQueries = 0;
703          // If isSpecificStart is set, only walk one list from that particular node.
704          // Note that in case of circular (or P-shaped) list it will walk forever, as is
705          // the case in normal run without startKey.
706
707          CINode node = findStartNode(table, startKey);
708          if (node == null) {
709            LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
710            throw new IOException("Start node not found: " + startKeyIn);
711          }
712          while (numQueries < maxQueries) {
713            numQueries++;
714            byte[] prev = node.prev;
715            long t1 = EnvironmentEdgeManager.currentTime();
716            node = getNode(prev, table, node);
717            long t2 = EnvironmentEdgeManager.currentTime();
718            if (node == null) {
719              LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
720              context.getCounter(Counts.UNDEFINED).increment(1l);
721            } else if (node.prev.length == NO_KEY.length) {
722              LOG.error("ConcurrentWalker found TERMINATING NODE: " +
723                  Bytes.toStringBinary(node.key));
724              context.getCounter(Counts.TERMINATING).increment(1l);
725            } else {
726              // Increment for successful walk
727              context.getCounter(Counts.SUCCESS).increment(1l);
728            }
729          }
730          table.close();
731          connection.close();
732        }
733      }
734    }
735
736    @Override
737    public int run(String[] args) throws Exception {
738      if (args.length < 3) {
739        System.err.println(USAGE);
740        return 1;
741      }
742      try {
743        int numMappers = Integer.parseInt(args[0]);
744        long numNodes = Long.parseLong(args[1]);
745        Path tmpOutput = new Path(args[2]);
746        Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
747        Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
748        Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]);
749        return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
750      } catch (NumberFormatException e) {
751        System.err.println("Parsing generator arguments failed: " + e.getMessage());
752        System.err.println(USAGE);
753        return 1;
754      }
755    }
756
757    protected void createSchema() throws IOException {
758      Configuration conf = getConf();
759      TableName tableName = getTableName(conf);
760      try (Connection conn = ConnectionFactory.createConnection(conf);
761          Admin admin = conn.getAdmin()) {
762        if (!admin.tableExists(tableName)) {
763          TableDescriptor tableDescriptor = TableDescriptorBuilder
764            .newBuilder(getTableName(getConf()))
765            // if -DuseMob=true force all data through mob path.
766            .setColumnFamily(
767              setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME)).build())
768            // Always add these families. Just skip writing to them when we do not test per CF
769            // flush.
770            .setColumnFamily(
771              setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(BIG_FAMILY_NAME))
772                .build())
773            .setColumnFamily(
774              setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(TINY_FAMILY_NAME))
775                .build())
776            .build();
777
778          // If we want to pre-split compute how many splits.
779          if (conf.getBoolean(HBaseTestingUtil.PRESPLIT_TEST_TABLE_KEY,
780              HBaseTestingUtil.PRESPLIT_TEST_TABLE)) {
781            int numberOfServers = admin.getRegionServers().size();
782            if (numberOfServers == 0) {
783              throw new IllegalStateException("No live regionservers");
784            }
785            int regionsPerServer = conf.getInt(HBaseTestingUtil.REGIONS_PER_SERVER_KEY,
786                HBaseTestingUtil.DEFAULT_REGIONS_PER_SERVER);
787            int totalNumberOfRegions = numberOfServers * regionsPerServer;
788            LOG.info("Number of live regionservers: " + numberOfServers + ", " +
789                "pre-splitting table into " + totalNumberOfRegions + " regions " +
790                "(default regions per server: " + regionsPerServer + ")");
791
792
793            byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
794
795            admin.createTable(tableDescriptor, splits);
796          } else {
797            // Looks like we're just letting things play out.
798            // Create a table with on region by default.
799            // This will make the splitting work hard.
800            admin.createTable(tableDescriptor);
801          }
802        }
803      } catch (MasterNotRunningException e) {
804        LOG.error("Master not running", e);
805        throw new IOException(e);
806      }
807    }
808
809    public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
810        Integer width, Integer wrapMultiplier, Integer numWalkers)
811        throws Exception {
812      LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
813          + ", numNodes=" + numNodes);
814      Job job = Job.getInstance(getConf());
815
816      job.setJobName("Random Input Generator");
817      job.setNumReduceTasks(0);
818      job.setJarByClass(getClass());
819
820      job.setInputFormatClass(GeneratorInputFormat.class);
821      job.setOutputKeyClass(BytesWritable.class);
822      job.setOutputValueClass(NullWritable.class);
823
824      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
825
826      job.setMapperClass(Mapper.class); //identity mapper
827
828      FileOutputFormat.setOutputPath(job, tmpOutput);
829      job.setOutputFormatClass(SequenceFileOutputFormat.class);
830      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);
831
832      boolean success = jobCompletion(job);
833
834      return success ? 0 : 1;
835    }
836
837    public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
838        Integer width, Integer wrapMultiplier, Integer numWalkers)
839        throws Exception {
840      LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
841      createSchema();
842      job = Job.getInstance(getConf());
843
844      job.setJobName("Link Generator");
845      job.setNumReduceTasks(0);
846      job.setJarByClass(getClass());
847
848      FileInputFormat.setInputPaths(job, tmpOutput);
849      job.setInputFormatClass(OneFilePerMapperSFIF.class);
850      job.setOutputKeyClass(NullWritable.class);
851      job.setOutputValueClass(NullWritable.class);
852
853      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
854
855      setMapperForGenerator(job);
856
857      job.setOutputFormatClass(NullOutputFormat.class);
858
859      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
860      TableMapReduceUtil.addDependencyJars(job);
861      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
862                                                     AbstractHBaseTool.class);
863      TableMapReduceUtil.initCredentials(job);
864
865      boolean success = jobCompletion(job);
866
867      return success ? 0 : 1;
868    }
869
870    protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
871        ClassNotFoundException {
872      boolean success = job.waitForCompletion(true);
873      return success;
874    }
875
876    protected void setMapperForGenerator(Job job) {
877      job.setMapperClass(GeneratorMapper.class);
878    }
879
880    public int run(int numMappers, long numNodes, Path tmpOutput,
881        Integer width, Integer wrapMultiplier, Integer numWalkers)
882        throws Exception {
883      int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier,
884          numWalkers);
885      if (ret > 0) {
886        return ret;
887      }
888      return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
889    }
890
891    public boolean verify() {
892      try {
893        Counters counters = job.getCounters();
894        if (counters == null) {
895          LOG.info("Counters object was null, Generator verification cannot be performed."
896              + " This is commonly a result of insufficient YARN configuration.");
897          return false;
898        }
899
900        if (counters.findCounter(Counts.TERMINATING).getValue() > 0 ||
901            counters.findCounter(Counts.UNDEFINED).getValue() > 0 ||
902            counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) {
903          LOG.error("Concurrent walker failed to verify during Generation phase");
904          LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue());
905          LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue());
906          LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue());
907          return false;
908        }
909      } catch (IOException e) {
910        LOG.info("Generator verification could not find counter");
911        return false;
912      }
913      return true;
914    }
915  }
916
917  private static ColumnFamilyDescriptorBuilder setMobProperties(final Configuration conf,
918    final ColumnFamilyDescriptorBuilder builder) {
919    if (conf.getBoolean("useMob", false)) {
920      builder.setMobEnabled(true);
921      builder.setMobThreshold(4);
922    }
923    return builder;
924  }
925
926  /**
927   * Tool to search missing rows in WALs and hfiles.
928   * Pass in file or dir of keys to search for. Key file must have been written by Verify step
929   * (we depend on the format it writes out. We'll read them in and then search in hbase
930   * WALs and oldWALs dirs (Some of this is TODO).
931   */
932  static class Search extends Configured implements Tool {
933    private static final Logger LOG = LoggerFactory.getLogger(Search.class);
934    protected Job job;
935
936    private static void printUsage(final String error) {
937      if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
938      System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
939    }
940
941    @Override
942    public int run(String[] args) throws Exception {
943      if (args.length < 1 || args.length > 2) {
944        printUsage(null);
945        return 1;
946      }
947      Path inputDir = new Path(args[0]);
948      int numMappers = 1;
949      if (args.length > 1) {
950        numMappers = Integer.parseInt(args[1]);
951      }
952      return run(inputDir, numMappers);
953    }
954
955    /**
956     * WALPlayer override that searches for keys loaded in the setup.
957     */
958    public static class WALSearcher extends WALPlayer {
959      public WALSearcher(Configuration conf) {
960        super(conf);
961      }
962
963      /**
964       * The actual searcher mapper.
965       */
966      public static class WALMapperSearcher extends WALMapper {
967        private SortedSet<byte []> keysToFind;
968        private AtomicInteger rows = new AtomicInteger(0);
969
970        @Override
971        public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
972            throws IOException {
973          super.setup(context);
974          try {
975            this.keysToFind = readKeysToSearch(context.getConfiguration());
976            LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
977          } catch (InterruptedException e) {
978            throw new InterruptedIOException(e.toString());
979          }
980        }
981
982        @Override
983        protected boolean filter(Context context, Cell cell) {
984          // TODO: Can I do a better compare than this copying out key?
985          byte [] row = new byte [cell.getRowLength()];
986          System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
987          boolean b = this.keysToFind.contains(row);
988          if (b) {
989            String keyStr = Bytes.toStringBinary(row);
990            try {
991              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
992            } catch (IOException|InterruptedException e) {
993              LOG.warn(e.toString(), e);
994            }
995            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
996              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
997            }
998            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
999          }
1000          return b;
1001        }
1002      }
1003
1004      // Put in place the above WALMapperSearcher.
1005      @Override
1006      public Job createSubmittableJob(String[] args) throws IOException {
1007        Job job = super.createSubmittableJob(args);
1008        // Call my class instead.
1009        job.setJarByClass(WALMapperSearcher.class);
1010        job.setMapperClass(WALMapperSearcher.class);
1011        job.setOutputFormatClass(NullOutputFormat.class);
1012        return job;
1013      }
1014    }
1015
1016    static final String FOUND_GROUP_KEY = "Found";
1017    static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
1018
1019    public int run(Path inputDir, int numMappers) throws Exception {
1020      getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
1021      SortedSet<byte []> keys = readKeysToSearch(getConf());
1022      if (keys.isEmpty()) throw new RuntimeException("No keys to find");
1023      LOG.info("Count of keys to find: " + keys.size());
1024      for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
1025      // Now read all WALs. In two dirs. Presumes certain layout.
1026      Path walsDir = new Path(
1027          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
1028      Path oldWalsDir = new Path(
1029          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
1030      LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
1031        " against " + getConf().get(HConstants.HBASE_DIR));
1032      int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
1033          new String [] {walsDir.toString(), ""});
1034      if (ret != 0) {
1035        return ret;
1036      }
1037      return ToolRunner.run(getConf(), new WALSearcher(getConf()),
1038          new String [] {oldWalsDir.toString(), ""});
1039    }
1040
1041    static SortedSet<byte []> readKeysToSearch(final Configuration conf)
1042    throws IOException, InterruptedException {
1043      Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
1044      FileSystem fs = FileSystem.get(conf);
1045      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1046      if (!fs.exists(keysInputDir)) {
1047        throw new FileNotFoundException(keysInputDir.toString());
1048      }
1049      if (!fs.isDirectory(keysInputDir)) {
1050        throw new UnsupportedOperationException("TODO");
1051      } else {
1052        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
1053        while(iterator.hasNext()) {
1054          LocatedFileStatus keyFileStatus = iterator.next();
1055          // Skip "_SUCCESS" file.
1056          if (keyFileStatus.getPath().getName().startsWith("_")) continue;
1057          result.addAll(readFileToSearch(conf, fs, keyFileStatus));
1058        }
1059      }
1060      return result;
1061    }
1062
1063    private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
1064        final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
1065        InterruptedException {
1066      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1067      // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
1068      // what is missing.
1069      TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
1070      try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
1071          new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
1072        InputSplit is =
1073          new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
1074        rr.initialize(is, context);
1075        while (rr.nextKeyValue()) {
1076          rr.getCurrentKey();
1077          BytesWritable bw = rr.getCurrentValue();
1078          if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
1079            byte[] key = new byte[rr.getCurrentKey().getLength()];
1080            System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
1081                .getLength());
1082            result.add(key);
1083          }
1084        }
1085      }
1086      return result;
1087    }
1088  }
1089
1090  /**
1091   * A Map Reduce job that verifies that the linked lists generated by
1092   * {@link Generator} do not have any holes.
1093   */
1094  static class Verify extends Configured implements Tool {
1095    private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
1096    protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
1097    protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
1098    protected Job job;
1099
1100    public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
1101      private BytesWritable row = new BytesWritable();
1102      private BytesWritable ref = new BytesWritable();
1103      private boolean multipleUnevenColumnFamilies;
1104
1105      @Override
1106      protected void setup(
1107          Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
1108          throws IOException, InterruptedException {
1109        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
1110      }
1111
1112      @Override
1113      protected void map(ImmutableBytesWritable key, Result value, Context context)
1114          throws IOException ,InterruptedException {
1115        byte[] rowKey = key.get();
1116        row.set(rowKey, 0, rowKey.length);
1117        if (multipleUnevenColumnFamilies
1118            && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn(
1119              TINY_FAMILY_NAME, TINY_FAMILY_NAME))) {
1120          context.write(row, DEF_LOST_FAMILIES);
1121        } else {
1122          context.write(row, DEF);
1123        }
1124        byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
1125        if (prev != null && prev.length > 0) {
1126          ref.set(prev, 0, prev.length);
1127          context.write(ref, row);
1128        } else {
1129          LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
1130        }
1131      }
1132    }
1133
1134    /**
1135     * Don't change the order of these enums. Their ordinals are used as type flag when we emit
1136     * problems found from the reducer.
1137     */
1138    public static enum Counts {
1139      UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES,
1140      LOST_FAMILIES
1141    }
1142
1143    /**
1144     * Per reducer, we output problem rows as byte arrays so can be used as input for
1145     * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag
1146     * saying what sort of emission it is. Flag is the Count enum ordinal as a short.
1147     */
1148    public static class VerifyReducer extends
1149        Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
1150      private ArrayList<byte[]> refs = new ArrayList<>();
1151      private final BytesWritable UNREF = new BytesWritable(addPrefixFlag(
1152        Counts.UNREFERENCED.ordinal(), new byte[] {}));
1153      private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag(
1154        Counts.LOST_FAMILIES.ordinal(), new byte[] {}));
1155
1156      private AtomicInteger rows = new AtomicInteger(0);
1157      private Connection connection;
1158
1159      @Override
1160      protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1161      throws IOException, InterruptedException {
1162        super.setup(context);
1163        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
1164      }
1165
1166      @Override
1167      protected void cleanup(
1168          Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1169          throws IOException, InterruptedException {
1170        if (this.connection != null) {
1171          this.connection.close();
1172        }
1173        super.cleanup(context);
1174      }
1175
1176      /**
1177       * @param ordinal
1178       * @param r
1179       * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up
1180       * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
1181       */
1182      public static byte[] addPrefixFlag(final int ordinal, final byte [] r) {
1183        byte[] prefix = Bytes.toBytes((short)ordinal);
1184        if (prefix.length != Bytes.SIZEOF_SHORT) {
1185          throw new RuntimeException("Unexpected size: " + prefix.length);
1186        }
1187        byte[] result = new byte[prefix.length + r.length];
1188        System.arraycopy(prefix, 0, result, 0, prefix.length);
1189        System.arraycopy(r, 0, result, prefix.length, r.length);
1190        return result;
1191      }
1192
1193      /**
1194       * @param bs
1195       * @return Type from the Counts enum of this row. Reads prefix added by
1196       * {@link #addPrefixFlag(int, byte[])}
1197       */
1198      public static Counts whichType(final byte [] bs) {
1199        int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
1200        return Counts.values()[ordinal];
1201      }
1202
1203      /**
1204       * @param bw
1205       * @return Row bytes minus the type flag.
1206       */
1207      public static byte[] getRowOnly(BytesWritable bw) {
1208        byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
1209        System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
1210        return bytes;
1211      }
1212
1213      @Override
1214      public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
1215          throws IOException, InterruptedException {
1216        int defCount = 0;
1217        boolean lostFamilies = false;
1218        refs.clear();
1219        for (BytesWritable type : values) {
1220          if (type.getLength() == DEF.getLength()) {
1221            defCount++;
1222            if (type.getBytes()[0] == 1) {
1223              lostFamilies = true;
1224            }
1225          } else {
1226            byte[] bytes = new byte[type.getLength()];
1227            System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
1228            refs.add(bytes);
1229          }
1230        }
1231
1232        // TODO check for more than one def, should not happen
1233        StringBuilder refsSb = null;
1234        if (defCount == 0 || refs.size() != 1) {
1235          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1236          refsSb = dumpExtraInfoOnRefs(key, context, refs);
1237          LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
1238            (refsSb != null? refsSb.toString(): ""));
1239        }
1240        if (lostFamilies) {
1241          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1242          LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
1243          context.getCounter(Counts.LOST_FAMILIES).increment(1);
1244          context.write(key, LOSTFAM);
1245        }
1246
1247        if (defCount == 0 && refs.size() > 0) {
1248          // This is bad, found a node that is referenced but not defined. It must have been
1249          // lost, emit some info about this node for debugging purposes.
1250          // Write out a line per reference. If more than one, flag it.;
1251          for (int i = 0; i < refs.size(); i++) {
1252            byte[] bs = refs.get(i);
1253            int ordinal;
1254            if (i <= 0) {
1255              ordinal = Counts.UNDEFINED.ordinal();
1256              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1257              context.getCounter(Counts.UNDEFINED).increment(1);
1258            } else {
1259              ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
1260              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1261            }
1262          }
1263          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1264            // Print out missing row; doing get on reference gives info on when the referencer
1265            // was added which can help a little debugging. This info is only available in mapper
1266            // output -- the 'Linked List error Key...' log message above. What we emit here is
1267            // useless for debugging.
1268            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1269            context.getCounter("undef", keyString).increment(1);
1270          }
1271        } else if (defCount > 0 && refs.isEmpty()) {
1272          // node is defined but not referenced
1273          context.write(key, UNREF);
1274          context.getCounter(Counts.UNREFERENCED).increment(1);
1275          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1276            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1277            context.getCounter("unref", keyString).increment(1);
1278          }
1279        } else {
1280          if (refs.size() > 1) {
1281            // Skip first reference.
1282            for (int i = 1; i < refs.size(); i++) {
1283              context.write(key,
1284                new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
1285            }
1286            context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
1287          }
1288          // node is defined and referenced
1289          context.getCounter(Counts.REFERENCED).increment(1);
1290        }
1291      }
1292
1293      /**
1294       * Dump out extra info around references if there are any. Helps debugging.
1295       * @return StringBuilder filled with references if any.
1296       * @throws IOException
1297       */
1298      private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1299          final List<byte []> refs)
1300      throws IOException {
1301        StringBuilder refsSb = null;
1302        if (refs.isEmpty()) return refsSb;
1303        refsSb = new StringBuilder();
1304        String comma = "";
1305        // If a row is a reference but has no define, print the content of the row that has
1306        // this row as a 'prev'; it will help debug.  The missing row was written just before
1307        // the row we are dumping out here.
1308        TableName tn = getTableName(context.getConfiguration());
1309        try (Table t = this.connection.getTable(tn)) {
1310          for (byte [] ref : refs) {
1311            Result r = t.get(new Get(ref));
1312            List<Cell> cells = r.listCells();
1313            String ts = (cells != null && !cells.isEmpty())?
1314                new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
1315            byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1316            String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
1317            b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1318            long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
1319            b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1320            String refRegionLocation = "";
1321            String keyRegionLocation = "";
1322            if (b != null && b.length > 0) {
1323              try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1324                HRegionLocation hrl = rl.getRegionLocation(b);
1325                if (hrl != null) refRegionLocation = hrl.toString();
1326                // Key here probably has trailing zeros on it.
1327                hrl = rl.getRegionLocation(key.getBytes());
1328                if (hrl != null) keyRegionLocation = hrl.toString();
1329              }
1330            }
1331            LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) +
1332              ", refPrevEqualsKey=" +
1333                (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) +
1334                ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) +
1335                ", ref row date=" + ts + ", jobStr=" + jobStr +
1336                ", ref row count=" + count +
1337                ", ref row regionLocation=" + refRegionLocation +
1338                ", key row regionLocation=" + keyRegionLocation);
1339            refsSb.append(comma);
1340            comma = ",";
1341            refsSb.append(Bytes.toStringBinary(ref));
1342          }
1343        }
1344        return refsSb;
1345      }
1346    }
1347
1348    @Override
1349    public int run(String[] args) throws Exception {
1350      if (args.length != 2) {
1351        System.out.println("Usage : " + Verify.class.getSimpleName()
1352            + " <output dir> <num reducers>");
1353        return 0;
1354      }
1355
1356      String outputDir = args[0];
1357      int numReducers = Integer.parseInt(args[1]);
1358
1359      return run(outputDir, numReducers);
1360    }
1361
1362    public int run(String outputDir, int numReducers) throws Exception {
1363      return run(new Path(outputDir), numReducers);
1364    }
1365
1366    public int run(Path outputDir, int numReducers) throws Exception {
1367      LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
1368
1369      job = Job.getInstance(getConf());
1370
1371      job.setJobName("Link Verifier");
1372      job.setNumReduceTasks(numReducers);
1373      job.setJarByClass(getClass());
1374
1375      setJobScannerConf(job);
1376
1377      Scan scan = new Scan();
1378      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1379      scan.setCaching(10000);
1380      scan.setCacheBlocks(false);
1381      if (isMultiUnevenColumnFamilies(getConf())) {
1382        scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1383        scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1384      }
1385
1386      TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1387          VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1388      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
1389                                                     AbstractHBaseTool.class);
1390
1391      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1392
1393      job.setReducerClass(VerifyReducer.class);
1394      job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1395      job.setOutputKeyClass(BytesWritable.class);
1396      job.setOutputValueClass(BytesWritable.class);
1397      TextOutputFormat.setOutputPath(job, outputDir);
1398
1399      boolean success = job.waitForCompletion(true);
1400
1401      if (success) {
1402        Counters counters = job.getCounters();
1403        if (null == counters) {
1404          LOG.warn("Counters were null, cannot verify Job completion."
1405              + " This is commonly a result of insufficient YARN configuration.");
1406          // We don't have access to the counters to know if we have "bad" counts
1407          return 0;
1408        }
1409
1410        // If we find no unexpected values, the job didn't outright fail
1411        if (verifyUnexpectedValues(counters)) {
1412          // We didn't check referenced+unreferenced counts, leave that to visual inspection
1413          return 0;
1414        }
1415      }
1416
1417      // We failed
1418      return 1;
1419    }
1420
1421    public boolean verify(long expectedReferenced) throws Exception {
1422      if (job == null) {
1423        throw new IllegalStateException("You should call run() first");
1424      }
1425
1426      Counters counters = job.getCounters();
1427      if (counters == null) {
1428        LOG.info("Counters object was null, write verification cannot be performed."
1429              + " This is commonly a result of insufficient YARN configuration.");
1430        return false;
1431      }
1432
1433      // Run through each check, even if we fail one early
1434      boolean success = verifyExpectedValues(expectedReferenced, counters);
1435
1436      if (!verifyUnexpectedValues(counters)) {
1437        // We found counter objects which imply failure
1438        success = false;
1439      }
1440
1441      if (!success) {
1442        handleFailure(counters);
1443      }
1444      return success;
1445    }
1446
1447    /**
1448     * Verify the values in the Counters against the expected number of entries written.
1449     *
1450     * @param expectedReferenced
1451     *          Expected number of referenced entrires
1452     * @param counters
1453     *          The Job's Counters object
1454     * @return True if the values match what's expected, false otherwise
1455     */
1456    protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1457      final Counter referenced = counters.findCounter(Counts.REFERENCED);
1458      final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
1459      boolean success = true;
1460
1461      if (expectedReferenced != referenced.getValue()) {
1462        LOG.error("Expected referenced count does not match with actual referenced count. " +
1463            "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1464        success = false;
1465      }
1466
1467      if (unreferenced.getValue() > 0) {
1468        final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
1469        boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1470        LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1471            + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1472        success = false;
1473      }
1474
1475      return success;
1476    }
1477
1478    /**
1479     * Verify that the Counters don't contain values which indicate an outright failure from the Reducers.
1480     *
1481     * @param counters
1482     *          The Job's counters
1483     * @return True if the "bad" counter objects are 0, false otherwise
1484     */
1485    protected boolean verifyUnexpectedValues(Counters counters) {
1486      final Counter undefined = counters.findCounter(Counts.UNDEFINED);
1487      final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
1488      boolean success = true;
1489
1490      if (undefined.getValue() > 0) {
1491        LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1492        success = false;
1493      }
1494
1495      if (lostfamilies.getValue() > 0) {
1496        LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1497        success = false;
1498      }
1499
1500      return success;
1501    }
1502
1503    protected void handleFailure(Counters counters) throws IOException {
1504      Configuration conf = job.getConfiguration();
1505      TableName tableName = getTableName(conf);
1506      try (Connection conn = ConnectionFactory.createConnection(conf)) {
1507        try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1508          CounterGroup g = counters.getGroup("undef");
1509          Iterator<Counter> it = g.iterator();
1510          while (it.hasNext()) {
1511            String keyString = it.next().getName();
1512            byte[] key = Bytes.toBytes(keyString);
1513            HRegionLocation loc = rl.getRegionLocation(key, true);
1514            LOG.error("undefined row " + keyString + ", " + loc);
1515          }
1516          g = counters.getGroup("unref");
1517          it = g.iterator();
1518          while (it.hasNext()) {
1519            String keyString = it.next().getName();
1520            byte[] key = Bytes.toBytes(keyString);
1521            HRegionLocation loc = rl.getRegionLocation(key, true);
1522            LOG.error("unreferred row " + keyString + ", " + loc);
1523          }
1524        }
1525      }
1526    }
1527  }
1528
1529  /**
1530   * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
1531   * adds more data.
1532   */
1533  static class Loop extends Configured implements Tool {
1534
1535    private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
1536    private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " +
1537        "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" +
1538        " <num walker threads>] \n" +
1539        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
1540        "walkers will select and verify random flushed loop during Generation.";
1541
1542    IntegrationTestBigLinkedList it;
1543
1544    protected void runGenerator(int numMappers, long numNodes,
1545        String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers)
1546        throws Exception {
1547      Path outputPath = new Path(outputDir);
1548      UUID uuid = UUID.randomUUID(); //create a random UUID.
1549      Path generatorOutput = new Path(outputPath, uuid.toString());
1550
1551      Generator generator = new Generator();
1552      generator.setConf(getConf());
1553      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
1554          numWalkers);
1555      if (retCode > 0) {
1556        throw new RuntimeException("Generator failed with return code: " + retCode);
1557      }
1558      if (numWalkers > 0) {
1559        if (!generator.verify()) {
1560          throw new RuntimeException("Generator.verify failed");
1561        }
1562      }
1563      LOG.info("Generator finished with success. Total nodes=" + numNodes);
1564    }
1565
1566    protected void runVerify(String outputDir,
1567        int numReducers, long expectedNumNodes) throws Exception {
1568      Path outputPath = new Path(outputDir);
1569      UUID uuid = UUID.randomUUID(); //create a random UUID.
1570      Path iterationOutput = new Path(outputPath, uuid.toString());
1571
1572      Verify verify = new Verify();
1573      verify.setConf(getConf());
1574      int retCode = verify.run(iterationOutput, numReducers);
1575      if (retCode > 0) {
1576        throw new RuntimeException("Verify.run failed with return code: " + retCode);
1577      }
1578
1579      if (!verify.verify(expectedNumNodes)) {
1580        throw new RuntimeException("Verify.verify failed");
1581      }
1582      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
1583    }
1584
1585    @Override
1586    public int run(String[] args) throws Exception {
1587      if (args.length < 5) {
1588        System.err.println(USAGE);
1589        return 1;
1590      }
1591      try {
1592        int numIterations = Integer.parseInt(args[0]);
1593        int numMappers = Integer.parseInt(args[1]);
1594        long numNodes = Long.parseLong(args[2]);
1595        String outputDir = args[3];
1596        int numReducers = Integer.parseInt(args[4]);
1597        Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1598        Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1599        Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]);
1600
1601        long expectedNumNodes = 0;
1602
1603        if (numIterations < 0) {
1604          numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
1605        }
1606        LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1607        for (int i = 0; i < numIterations; i++) {
1608          LOG.info("Starting iteration = " + i);
1609          runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers);
1610          expectedNumNodes += numMappers * numNodes;
1611          runVerify(outputDir, numReducers, expectedNumNodes);
1612        }
1613        return 0;
1614      } catch (NumberFormatException e) {
1615        System.err.println("Parsing loop arguments failed: " + e.getMessage());
1616        System.err.println(USAGE);
1617        return 1;
1618      }
1619    }
1620  }
1621
1622  /**
1623   * A stand alone program that prints out portions of a list created by {@link Generator}
1624   */
1625  private static class Print extends Configured implements Tool {
1626    @Override
1627    public int run(String[] args) throws Exception {
1628      Options options = new Options();
1629      options.addOption("s", "start", true, "start key");
1630      options.addOption("e", "end", true, "end key");
1631      options.addOption("l", "limit", true, "number to print");
1632
1633      GnuParser parser = new GnuParser();
1634      CommandLine cmd = null;
1635      try {
1636        cmd = parser.parse(options, args);
1637        if (cmd.getArgs().length != 0) {
1638          throw new ParseException("Command takes no arguments");
1639        }
1640      } catch (ParseException e) {
1641        System.err.println("Failed to parse command line " + e.getMessage());
1642        System.err.println();
1643        HelpFormatter formatter = new HelpFormatter();
1644        formatter.printHelp(getClass().getSimpleName(), options);
1645        System.exit(-1);
1646      }
1647
1648      Connection connection = ConnectionFactory.createConnection(getConf());
1649      Table table = connection.getTable(getTableName(getConf()));
1650
1651      Scan scan = new Scan();
1652      scan.setBatch(10000);
1653
1654      if (cmd.hasOption("s"))
1655        scan.withStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1656
1657      if (cmd.hasOption("e")) {
1658        scan.withStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1659      }
1660
1661      int limit = 0;
1662      if (cmd.hasOption("l"))
1663        limit = Integer.parseInt(cmd.getOptionValue("l"));
1664      else
1665        limit = 100;
1666
1667      ResultScanner scanner = table.getScanner(scan);
1668
1669      CINode node = new CINode();
1670      Result result = scanner.next();
1671      int count = 0;
1672      while (result != null && count++ < limit) {
1673        node = getCINode(result, node);
1674        System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1675            Bytes.toStringBinary(node.prev), node.count, node.client);
1676        result = scanner.next();
1677      }
1678      scanner.close();
1679      table.close();
1680      connection.close();
1681
1682      return 0;
1683    }
1684  }
1685
1686  /**
1687   * A stand alone program that deletes a single node.
1688   */
1689  private static class Delete extends Configured implements Tool {
1690    @Override
1691    public int run(String[] args) throws Exception {
1692      if (args.length != 1) {
1693        System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1694        return 0;
1695      }
1696      byte[] val = Bytes.toBytesBinary(args[0]);
1697
1698      org.apache.hadoop.hbase.client.Delete delete
1699        = new org.apache.hadoop.hbase.client.Delete(val);
1700
1701      try (Connection connection = ConnectionFactory.createConnection(getConf());
1702          Table table = connection.getTable(getTableName(getConf()))) {
1703        table.delete(delete);
1704      }
1705
1706      System.out.println("Delete successful");
1707      return 0;
1708    }
1709  }
1710
1711  abstract static class WalkerBase extends Configured{
1712    protected static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1713      Scan scan = new Scan();
1714      scan.withStartRow(startKey);
1715      scan.setBatch(1);
1716      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1717
1718      long t1 = EnvironmentEdgeManager.currentTime();
1719      ResultScanner scanner = table.getScanner(scan);
1720      Result result = scanner.next();
1721      long t2 = EnvironmentEdgeManager.currentTime();
1722      scanner.close();
1723
1724      if ( result != null) {
1725        CINode node = getCINode(result, new CINode());
1726        System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1727        return node;
1728      }
1729
1730      System.out.println("FSR " + (t2 - t1));
1731
1732      return null;
1733    }
1734    protected CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1735      Get get = new Get(row);
1736      get.addColumn(FAMILY_NAME, COLUMN_PREV);
1737      Result result = table.get(get);
1738      return getCINode(result, node);
1739    }
1740  }
1741  /**
1742   * A stand alone program that follows a linked list created by {@link Generator} and prints
1743   * timing info.
1744   */
1745  private static class Walker extends WalkerBase implements Tool {
1746
1747    public Walker(){}
1748
1749    @Override
1750    public int run(String[] args) throws IOException {
1751
1752      Options options = new Options();
1753      options.addOption("n", "num", true, "number of queries");
1754      options.addOption("s", "start", true, "key to start at, binary string");
1755      options.addOption("l", "logevery", true, "log every N queries");
1756
1757      GnuParser parser = new GnuParser();
1758      CommandLine cmd = null;
1759      try {
1760        cmd = parser.parse(options, args);
1761        if (cmd.getArgs().length != 0) {
1762          throw new ParseException("Command takes no arguments");
1763        }
1764      } catch (ParseException e) {
1765        System.err.println("Failed to parse command line " + e.getMessage());
1766        System.err.println();
1767        HelpFormatter formatter = new HelpFormatter();
1768        formatter.printHelp(getClass().getSimpleName(), options);
1769        System.exit(-1);
1770      }
1771
1772      long maxQueries = Long.MAX_VALUE;
1773      if (cmd.hasOption('n')) {
1774        maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1775      }
1776      Random rand = new SecureRandom();
1777      boolean isSpecificStart = cmd.hasOption('s');
1778
1779      byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1780      int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1781
1782      Connection connection = ConnectionFactory.createConnection(getConf());
1783      Table table = connection.getTable(getTableName(getConf()));
1784      long numQueries = 0;
1785      // If isSpecificStart is set, only walk one list from that particular node.
1786      // Note that in case of circular (or P-shaped) list it will walk forever, as is
1787      // the case in normal run without startKey.
1788      while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1789        if (!isSpecificStart) {
1790          startKey = new byte[ROWKEY_LENGTH];
1791          rand.nextBytes(startKey);
1792        }
1793        CINode node = findStartNode(table, startKey);
1794        if (node == null && isSpecificStart) {
1795          System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1796        }
1797        numQueries++;
1798        while (node != null && node.prev.length != NO_KEY.length &&
1799            numQueries < maxQueries) {
1800          byte[] prev = node.prev;
1801          long t1 = EnvironmentEdgeManager.currentTime();
1802          node = getNode(prev, table, node);
1803          long t2 = EnvironmentEdgeManager.currentTime();
1804          if (logEvery > 0 && numQueries % logEvery == 0) {
1805            System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1806          }
1807          numQueries++;
1808          if (node == null) {
1809            System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1810          } else if (node.prev.length == NO_KEY.length) {
1811            System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1812          }
1813        }
1814      }
1815      table.close();
1816      connection.close();
1817      return 0;
1818    }
1819  }
1820
1821  private static class Clean extends Configured implements Tool {
1822    @Override public int run(String[] args) throws Exception {
1823      if (args.length < 1) {
1824        System.err.println("Usage: Clean <output dir>");
1825        return -1;
1826      }
1827
1828      Path p = new Path(args[0]);
1829      Configuration conf = getConf();
1830      TableName tableName = getTableName(conf);
1831      try (FileSystem fs = HFileSystem.get(conf);
1832          Connection conn = ConnectionFactory.createConnection(conf);
1833          Admin admin = conn.getAdmin()) {
1834        if (admin.tableExists(tableName)) {
1835          admin.disableTable(tableName);
1836          admin.deleteTable(tableName);
1837        }
1838
1839        if (fs.exists(p)) {
1840          fs.delete(p, true);
1841        }
1842      }
1843
1844      return 0;
1845    }
1846  }
1847
1848  static TableName getTableName(Configuration conf) {
1849    return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1850  }
1851
1852  private static CINode getCINode(Result result, CINode node) {
1853    node.key = Bytes.copy(result.getRow());
1854    if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1855      node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1856    } else {
1857      node.prev = NO_KEY;
1858    }
1859    if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1860      node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1861    } else {
1862      node.count = -1;
1863    }
1864    if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1865      node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1866    } else {
1867      node.client = "";
1868    }
1869    return node;
1870  }
1871
1872  protected IntegrationTestingUtility util;
1873
1874  @Override
1875  public void setUpCluster() throws Exception {
1876    util = getTestingUtil(getConf());
1877    boolean isDistributed = util.isDistributedCluster();
1878    util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1879    if (!isDistributed) {
1880      util.startMiniMapReduceCluster();
1881    }
1882    this.setConf(util.getConfiguration());
1883  }
1884
1885  @Override
1886  public void cleanUpCluster() throws Exception {
1887    super.cleanUpCluster();
1888    if (util.isDistributedCluster()) {
1889      util.shutdownMiniMapReduceCluster();
1890    }
1891  }
1892
1893  private static boolean isMultiUnevenColumnFamilies(Configuration conf) {
1894    return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,true);
1895  }
1896
1897  @Test
1898  public void testContinuousIngest() throws IOException, Exception {
1899    //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1900    Configuration conf = getTestingUtil(getConf()).getConfiguration();
1901    if (isMultiUnevenColumnFamilies(getConf())) {
1902      // make sure per CF flush is on
1903      conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
1904    }
1905    int ret =
1906        ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1907            util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1908    org.junit.Assert.assertEquals(0, ret);
1909  }
1910
1911  private void usage() {
1912    System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1913    printCommands();
1914  }
1915
1916  private void printCommands() {
1917    System.err.println("Commands:");
1918    System.err.println(" generator  Map only job that generates data.");
1919    System.err.println(" verify     A map reduce job that looks for holes. Check return code and");
1920    System.err.println("            look at the counts after running. See REFERENCED and");
1921    System.err.println("            UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1922    System.err.println("            with the Generator.");
1923    System.err.println(" walker     " +
1924      "Standalone program that starts following a linked list & emits timing info.");
1925    System.err.println(" print      Standalone program that prints nodes in the linked list.");
1926    System.err.println(" delete     Standalone program that deletes a single node.");
1927    System.err.println(" loop       Program to Loop through Generator and Verify steps");
1928    System.err.println(" clean      Program to clean all left over detritus.");
1929    System.err.println(" search     Search for missing keys.");
1930    System.err.println("");
1931    System.err.println("General options:");
1932    System.err.println(" -D"+ TABLE_NAME_KEY+ "=<tableName>");
1933    System.err.println("    Run using the <tableName> as the tablename.  Defaults to "
1934        + DEFAULT_TABLE_NAME);
1935    System.err.println(" -D"+ HBaseTestingUtil.REGIONS_PER_SERVER_KEY+ "=<# regions>");
1936    System.err.println("    Create table with presplit regions per server.  Defaults to "
1937        + HBaseTestingUtil.DEFAULT_REGIONS_PER_SERVER);
1938
1939    System.err.println(" -DuseMob=<true|false>");
1940    System.err.println("    Create table so that the mob read/write path is forced.  " +
1941        "Defaults to false");
1942
1943    System.err.flush();
1944  }
1945
1946  @Override
1947  protected void processOptions(CommandLine cmd) {
1948    super.processOptions(cmd);
1949    String[] args = cmd.getArgs();
1950    //get the class, run with the conf
1951    if (args.length < 1) {
1952      printUsage(this.getClass().getSimpleName() +
1953        " <general options> COMMAND [<COMMAND options>]", "General options:", "");
1954      printCommands();
1955      // Have to throw an exception here to stop the processing. Looks ugly but gets message across.
1956      throw new RuntimeException("Incorrect Number of args.");
1957    }
1958    toRun = args[0];
1959    otherArgs = Arrays.copyOfRange(args, 1, args.length);
1960  }
1961
1962  @Override
1963  public int runTestFromCommandLine() throws Exception {
1964    Tool tool = null;
1965    if (toRun.equalsIgnoreCase("Generator")) {
1966      tool = new Generator();
1967    } else if (toRun.equalsIgnoreCase("Verify")) {
1968      tool = new Verify();
1969    } else if (toRun.equalsIgnoreCase("Loop")) {
1970      Loop loop = new Loop();
1971      loop.it = this;
1972      tool = loop;
1973    } else if (toRun.equalsIgnoreCase("Walker")) {
1974      tool = new Walker();
1975    } else if (toRun.equalsIgnoreCase("Print")) {
1976      tool = new Print();
1977    } else if (toRun.equalsIgnoreCase("Delete")) {
1978      tool = new Delete();
1979    } else if (toRun.equalsIgnoreCase("Clean")) {
1980      tool = new Clean();
1981    } else if (toRun.equalsIgnoreCase("Search")) {
1982      tool = new Search();
1983    } else {
1984      usage();
1985      throw new RuntimeException("Unknown arg");
1986    }
1987
1988    return ToolRunner.run(getConf(), tool, otherArgs);
1989  }
1990
1991  @Override
1992  public TableName getTablename() {
1993    Configuration c = getConf();
1994    return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1995  }
1996
1997  @Override
1998  protected Set<String> getColumnFamilies() {
1999    if (isMultiUnevenColumnFamilies(getConf())) {
2000      return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
2001        Bytes.toString(TINY_FAMILY_NAME));
2002    } else {
2003      return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
2004    }
2005  }
2006
2007  private static void setJobConf(Job job, int numMappers, long numNodes,
2008      Integer width, Integer wrapMultiplier, Integer numWalkers) {
2009    job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
2010    job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
2011    if (width != null) {
2012      job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
2013    }
2014    if (wrapMultiplier != null) {
2015      job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
2016    }
2017    if (numWalkers != null) {
2018      job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers);
2019    }
2020  }
2021
2022  public static void setJobScannerConf(Job job) {
2023    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
2024  }
2025
2026  public static void main(String[] args) throws Exception {
2027    Configuration conf = HBaseConfiguration.create();
2028    IntegrationTestingUtility.setUseDistributedCluster(conf);
2029    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
2030    System.exit(ret);
2031  }
2032}