001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.security.SecureRandom;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Random;
031import java.util.Set;
032import java.util.SortedSet;
033import java.util.TreeSet;
034import java.util.UUID;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.atomic.AtomicInteger;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.conf.Configured;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.LocatedFileStatus;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.fs.RemoteIterator;
043import org.apache.hadoop.hbase.Cell;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HBaseTestingUtility;
046import org.apache.hadoop.hbase.HColumnDescriptor;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.HTableDescriptor;
050import org.apache.hadoop.hbase.IntegrationTestBase;
051import org.apache.hadoop.hbase.IntegrationTestingUtility;
052import org.apache.hadoop.hbase.MasterNotRunningException;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.client.Admin;
055import org.apache.hadoop.hbase.client.BufferedMutator;
056import org.apache.hadoop.hbase.client.BufferedMutatorParams;
057import org.apache.hadoop.hbase.client.Connection;
058import org.apache.hadoop.hbase.client.ConnectionConfiguration;
059import org.apache.hadoop.hbase.client.ConnectionFactory;
060import org.apache.hadoop.hbase.client.Get;
061import org.apache.hadoop.hbase.client.Mutation;
062import org.apache.hadoop.hbase.client.Put;
063import org.apache.hadoop.hbase.client.RegionLocator;
064import org.apache.hadoop.hbase.client.Result;
065import org.apache.hadoop.hbase.client.ResultScanner;
066import org.apache.hadoop.hbase.client.Scan;
067import org.apache.hadoop.hbase.client.ScannerCallable;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.fs.HFileSystem;
070import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
071import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
072import org.apache.hadoop.hbase.mapreduce.TableMapper;
073import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
074import org.apache.hadoop.hbase.mapreduce.WALPlayer;
075import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy;
076import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
077import org.apache.hadoop.hbase.testclassification.IntegrationTests;
078import org.apache.hadoop.hbase.util.AbstractHBaseTool;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.CommonFSUtils;
081import org.apache.hadoop.hbase.util.Random64;
082import org.apache.hadoop.hbase.util.RegionSplitter;
083import org.apache.hadoop.hbase.wal.WALEdit;
084import org.apache.hadoop.hbase.wal.WALKey;
085import org.apache.hadoop.io.BytesWritable;
086import org.apache.hadoop.io.NullWritable;
087import org.apache.hadoop.io.Writable;
088import org.apache.hadoop.mapreduce.Counter;
089import org.apache.hadoop.mapreduce.CounterGroup;
090import org.apache.hadoop.mapreduce.Counters;
091import org.apache.hadoop.mapreduce.InputFormat;
092import org.apache.hadoop.mapreduce.InputSplit;
093import org.apache.hadoop.mapreduce.Job;
094import org.apache.hadoop.mapreduce.JobContext;
095import org.apache.hadoop.mapreduce.Mapper;
096import org.apache.hadoop.mapreduce.RecordReader;
097import org.apache.hadoop.mapreduce.Reducer;
098import org.apache.hadoop.mapreduce.TaskAttemptContext;
099import org.apache.hadoop.mapreduce.TaskAttemptID;
100import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
101import org.apache.hadoop.mapreduce.lib.input.FileSplit;
102import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
103import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
104import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
105import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
106import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
107import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
108import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
109import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
110import org.apache.hadoop.util.Tool;
111import org.apache.hadoop.util.ToolRunner;
112import org.junit.Test;
113import org.junit.experimental.categories.Category;
114import org.slf4j.Logger;
115import org.slf4j.LoggerFactory;
116
117import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
118import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
119import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
120import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
121import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
122import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
123import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
124
125/**
126 * This is an integration test borrowed from goraci, written by Keith Turner,
127 * which is in turn inspired by the Accumulo test called continous ingest (ci).
128 * The original source code can be found here:
129 * https://github.com/keith-turner/goraci
130 * https://github.com/enis/goraci/
131 *
132 * Apache Accumulo [0] has a simple test suite that verifies that data is not
133 * lost at scale. This test suite is called continuous ingest. This test runs
134 * many ingest clients that continually create linked lists containing 25
135 * million nodes. At some point the clients are stopped and a map reduce job is
136 * run to ensure no linked list has a hole. A hole indicates data was lost.··
137 *
138 * The nodes in the linked list are random. This causes each linked list to
139 * spread across the table. Therefore if one part of a table loses data, then it
140 * will be detected by references in another part of the table.
141 *
142 * THE ANATOMY OF THE TEST
143 *
144 * Below is rough sketch of how data is written. For specific details look at
145 * the Generator code.
146 *
147 * 1 Write out 1 million nodes· 2 Flush the client· 3 Write out 1 million that
148 * reference previous million· 4 If this is the 25th set of 1 million nodes,
149 * then update 1st set of million to point to last· 5 goto 1
150 *
151 * The key is that nodes only reference flushed nodes. Therefore a node should
152 * never reference a missing node, even if the ingest client is killed at any
153 * point in time.
154 *
155 * When running this test suite w/ Accumulo there is a script running in
156 * parallel called the Aggitator that randomly and continuously kills server
157 * processes.·· The outcome was that many data loss bugs were found in Accumulo
158 * by doing this.· This test suite can also help find bugs that impact uptime
159 * and stability when· run for days or weeks.··
160 *
161 * This test suite consists the following· - a few Java programs· - a little
162 * helper script to run the java programs - a maven script to build it.··
163 *
164 * When generating data, its best to have each map task generate a multiple of
165 * 25 million. The reason for this is that circular linked list are generated
166 * every 25M. Not generating a multiple in 25M will result in some nodes in the
167 * linked list not having references. The loss of an unreferenced node can not
168 * be detected.
169 *
170 *
171 * Below is a description of the Java programs
172 *
173 * Generator - A map only job that generates data. As stated previously,·its best to generate data
174 * in multiples of 25M. An option is also available to allow concurrent walkers to select and walk
175 * random flushed loops during this phase.
176 *
177 * Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and
178 * UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same
179 * time as the Generator.
180 *
181 * Walker - A standalone program that start following a linked list· and emits timing info.··
182 *
183 * Print - A standalone program that prints nodes in the linked list
184 *
185 * Delete - A standalone program that deletes a single node
186 *
187 * This class can be run as a unit test, as an integration test, or from the command line
188 *
189 * ex:
190 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
191 *    loop 2 1 100000 /temp 1 1000 50 1 0
192 *
193 */
194@Category(IntegrationTests.class)
195public class IntegrationTestBigLinkedList extends IntegrationTestBase {
196  protected static final byte[] NO_KEY = new byte[1];
197
198  protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
199
200  protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
201
202  protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
203  private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
204  private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
205
206  //link to the id of the prev node in the linked list
207  protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
208
209  //identifier of the mapred task that generated this row
210  protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
211
212  //the id of the row within the same client.
213  protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
214
215  /** How many rows to write per map task. This has to be a multiple of 25M */
216  private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
217    = "IntegrationTestBigLinkedList.generator.num_rows";
218
219  private static final String GENERATOR_NUM_MAPPERS_KEY
220    = "IntegrationTestBigLinkedList.generator.map.tasks";
221
222  private static final String GENERATOR_WIDTH_KEY
223    = "IntegrationTestBigLinkedList.generator.width";
224
225  private static final String GENERATOR_WRAP_KEY
226    = "IntegrationTestBigLinkedList.generator.wrap";
227
228  private static final String CONCURRENT_WALKER_KEY
229    = "IntegrationTestBigLinkedList.generator.concurrentwalkers";
230
231  protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
232
233  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
234
235  private static final int WIDTH_DEFAULT = 1000000;
236  private static final int WRAP_DEFAULT = 25;
237  private static final int ROWKEY_LENGTH = 16;
238
239  private static final int CONCURRENT_WALKER_DEFAULT = 0;
240
241  protected String toRun;
242  protected String[] otherArgs;
243
244  static class CINode {
245    byte[] key;
246    byte[] prev;
247    String client;
248    long count;
249  }
250
251  /**
252   * A Map only job that generates random linked list and stores them.
253   */
254  static class Generator extends Configured implements Tool {
255
256    private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
257
258    /**
259     * Set this configuration if you want to test single-column family flush works. If set, we will
260     * add a big column family and a small column family on either side of the usual ITBLL 'meta'
261     * column family. When we write out the ITBLL, we will also add to the big column family a value
262     * bigger than that for ITBLL and for small, something way smaller. The idea is that when
263     * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
264     * way. Here is how you would pass it:
265     * <p>
266     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
267     * -Dgenerator.multiple.columnfamilies=true generator 1 10 g
268     */
269    public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
270        "generator.multiple.columnfamilies";
271
272    /**
273     * Set this configuration if you want to scale up the size of test data quickly.
274     * <p>
275     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
276     * -Dgenerator.big.family.value.size=1024 generator 1 10 output
277     */
278    public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size";
279
280
281    public static enum Counts {
282      SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION
283    }
284
285    public static final String USAGE =  "Usage : " + Generator.class.getSimpleName() +
286        " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" +
287        " <num walker threads>] \n" +
288        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
289        "walkers will verify random flushed loop during Generation.";
290
291    public Job job;
292
293    static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
294      static class GeneratorInputSplit extends InputSplit implements Writable {
295        @Override
296        public long getLength() throws IOException, InterruptedException {
297          return 1;
298        }
299        @Override
300        public String[] getLocations() throws IOException, InterruptedException {
301          return new String[0];
302        }
303        @Override
304        public void readFields(DataInput arg0) throws IOException {
305        }
306        @Override
307        public void write(DataOutput arg0) throws IOException {
308        }
309      }
310
311      static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
312        private long count;
313        private long numNodes;
314        private Random64 rand;
315
316        @Override
317        public void close() throws IOException {
318        }
319
320        @Override
321        public BytesWritable getCurrentKey() throws IOException, InterruptedException {
322          byte[] bytes = new byte[ROWKEY_LENGTH];
323          rand.nextBytes(bytes);
324          return new BytesWritable(bytes);
325        }
326
327        @Override
328        public NullWritable getCurrentValue() throws IOException, InterruptedException {
329          return NullWritable.get();
330        }
331
332        @Override
333        public float getProgress() throws IOException, InterruptedException {
334          return (float)(count / (double)numNodes);
335        }
336
337        @Override
338        public void initialize(InputSplit arg0, TaskAttemptContext context)
339            throws IOException, InterruptedException {
340          numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
341          // Use Random64 to avoid issue described in HBASE-21256.
342          rand = new Random64();
343        }
344
345        @Override
346        public boolean nextKeyValue() throws IOException, InterruptedException {
347          return count++ < numNodes;
348        }
349
350      }
351
352      @Override
353      public RecordReader<BytesWritable,NullWritable> createRecordReader(
354          InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
355        GeneratorRecordReader rr = new GeneratorRecordReader();
356        rr.initialize(split, context);
357        return rr;
358      }
359
360      @Override
361      public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
362        int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
363
364        ArrayList<InputSplit> splits = new ArrayList<>(numMappers);
365
366        for (int i = 0; i < numMappers; i++) {
367          splits.add(new GeneratorInputSplit());
368        }
369
370        return splits;
371      }
372    }
373
374    /** Ensure output files from prev-job go to map inputs for current job */
375    static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
376      @Override
377      protected boolean isSplitable(JobContext context, Path filename) {
378        return false;
379      }
380    }
381
382    /**
383     * Some ASCII art time:
384     * <p>
385     * [ . . . ] represents one batch of random longs of length WIDTH
386     * <pre>
387     *                _________________________
388     *               |                  ______ |
389     *               |                 |      ||
390     *             .-+-----------------+-----.||
391     *             | |                 |     |||
392     * first   = [ . . . . . . . . . . . ]   |||
393     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
394     *             | | | | | | | | | | |     |||
395     * prev    = [ . . . . . . . . . . . ]   |||
396     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
397     *             | | | | | | | | | | |     |||
398     * current = [ . . . . . . . . . . . ]   |||
399     *                                       |||
400     * ...                                   |||
401     *                                       |||
402     * last    = [ . . . . . . . . . . . ]   |||
403     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
404     *             |                 |________||
405     *             |___________________________|
406     * </pre>
407     */
408
409    static class GeneratorMapper
410      extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
411
412      byte[][] first = null;
413      byte[][] prev = null;
414      byte[][] current = null;
415      byte[] id;
416      long count = 0;
417      int i;
418      BufferedMutator mutator;
419      Connection connection;
420      long numNodes;
421      long wrap;
422      int width;
423      boolean multipleUnevenColumnFamilies;
424      byte[] tinyValue = new byte[] { 't' };
425      byte[] bigValue = null;
426      Configuration conf;
427
428      volatile boolean walkersStop;
429      int numWalkers;
430      volatile List<Long> flushedLoops = new ArrayList<>();
431      List<Thread> walkers = new ArrayList<>();
432
433      @Override
434      protected void setup(Context context) throws IOException, InterruptedException {
435        id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
436        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
437        instantiateHTable();
438        this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
439        current = new byte[this.width][];
440        int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
441        this.wrap = (long)wrapMultiplier * width;
442        this.numNodes = context.getConfiguration().getLong(
443            GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
444        if (this.numNodes < this.wrap) {
445          this.wrap = this.numNodes;
446        }
447        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
448        this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
449        this.walkersStop = false;
450        this.conf = context.getConfiguration();
451
452        if (multipleUnevenColumnFamilies) {
453          int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256);
454          int limit = context.getConfiguration().getInt(
455            ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
456            ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT);
457
458          Preconditions.checkArgument(
459            n <= limit,
460            "%s(%s) > %s(%s)",
461            BIG_FAMILY_VALUE_SIZE_KEY, n, ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit);
462
463          bigValue = new byte[n];
464          ThreadLocalRandom.current().nextBytes(bigValue);
465          LOG.info("Create a bigValue with " + n + " bytes.");
466        }
467
468        Preconditions.checkArgument(
469          numNodes > 0,
470          "numNodes(%s) <= 0",
471          numNodes);
472        Preconditions.checkArgument(
473          numNodes % width == 0,
474          "numNodes(%s) mod width(%s) != 0",
475          numNodes, width);
476        Preconditions.checkArgument(
477          numNodes % wrap == 0,
478          "numNodes(%s) mod wrap(%s) != 0",
479          numNodes, wrap
480        );
481      }
482
483      protected void instantiateHTable() throws IOException {
484        mutator = connection.getBufferedMutator(
485            new BufferedMutatorParams(getTableName(connection.getConfiguration()))
486                .writeBufferSize(4 * 1024 * 1024));
487      }
488
489      @Override
490      protected void cleanup(Context context) throws IOException ,InterruptedException {
491        joinWalkers();
492        mutator.close();
493        connection.close();
494      }
495
496      @Override
497      protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
498        current[i] = new byte[key.getLength()];
499        System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
500        if (++i == current.length) {
501          LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=",
502            current.length, count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i);
503          persist(output, count, prev, current, id);
504          i = 0;
505
506          if (first == null) {
507            first = current;
508          }
509          prev = current;
510          current = new byte[this.width][];
511
512          count += current.length;
513          output.setStatus("Count " + count);
514
515          if (count % wrap == 0) {
516            // this block of code turns the 1 million linked list of length 25 into one giant
517            //circular linked list of 25 million
518            circularLeftShift(first);
519            persist(output, -1, prev, first, null);
520            // At this point the entire loop has been flushed so we can add one of its nodes to the
521            // concurrent walker
522            if (numWalkers > 0) {
523              addFlushed(key.getBytes());
524              if (walkers.isEmpty()) {
525                startWalkers(numWalkers, conf, output);
526              }
527            }
528            first = null;
529            prev = null;
530          }
531        }
532      }
533
534      private static <T> void circularLeftShift(T[] first) {
535        T ez = first[0];
536        System.arraycopy(first, 1, first, 0, first.length - 1);
537        first[first.length - 1] = ez;
538      }
539
540      private void addFlushed(byte[] rowKey) {
541        synchronized (flushedLoops) {
542          flushedLoops.add(Bytes.toLong(rowKey));
543          flushedLoops.notifyAll();
544        }
545      }
546
547      protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
548          throws IOException {
549        for (int i = 0; i < current.length; i++) {
550
551          if (i % 100 == 0) {
552            // Tickle progress every so often else maprunner will think us hung
553            output.progress();
554          }
555
556          Put put = new Put(current[i]);
557          put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
558
559          if (count >= 0) {
560            put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
561          }
562          if (id != null) {
563            put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
564          }
565          // See if we are to write multiple columns.
566          if (this.multipleUnevenColumnFamilies) {
567            // Use any column name.
568            put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
569            // Use any column name.
570            put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
571          }
572          mutator.mutate(put);
573        }
574
575        mutator.flush();
576      }
577
578      private void startWalkers(int numWalkers, Configuration conf, Context context) {
579        LOG.info("Starting " + numWalkers + " concurrent walkers");
580        for (int i = 0; i < numWalkers; i++) {
581          Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context));
582          walker.start();
583          walkers.add(walker);
584        }
585      }
586
587      private void joinWalkers() {
588        walkersStop = true;
589        synchronized (flushedLoops) {
590          flushedLoops.notifyAll();
591        }
592        for (Thread walker : walkers) {
593          try {
594            walker.join();
595          } catch (InterruptedException e) {
596            // no-op
597          }
598        }
599      }
600
601      /**
602       * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by
603       * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are
604       * configured to only log erroneous nodes.
605       */
606
607      public class ContinuousConcurrentWalker implements Runnable {
608
609        ConcurrentWalker walker;
610        Configuration conf;
611        Context context;
612        Random rand;
613
614        public ContinuousConcurrentWalker(Configuration conf, Context context) {
615          this.conf = conf;
616          this.context = context;
617          rand = new Random();
618        }
619
620        @Override
621        public void run() {
622          while (!walkersStop) {
623            try {
624              long node = selectLoop();
625              try {
626                walkLoop(node);
627              } catch (IOException e) {
628                context.getCounter(Counts.IOEXCEPTION).increment(1l);
629                return;
630              }
631            } catch (InterruptedException e) {
632              return;
633            }
634          }
635        }
636
637        private void walkLoop(long node) throws IOException {
638          walker = new ConcurrentWalker(context);
639          walker.setConf(conf);
640          walker.run(node, wrap);
641        }
642
643        private long selectLoop () throws InterruptedException{
644          synchronized (flushedLoops) {
645            while (flushedLoops.isEmpty() && !walkersStop) {
646              flushedLoops.wait();
647            }
648            if (walkersStop) {
649              throw new InterruptedException();
650            }
651            return flushedLoops.get(rand.nextInt(flushedLoops.size()));
652          }
653        }
654      }
655
656      public static class ConcurrentWalker extends WalkerBase {
657
658        Context context;
659
660        public ConcurrentWalker(Context context) {this.context = context;}
661
662        public void run(long startKeyIn, long maxQueriesIn) throws IOException {
663
664          long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
665          byte[] startKey = Bytes.toBytes(startKeyIn);
666
667          Connection connection = ConnectionFactory.createConnection(getConf());
668          Table table = connection.getTable(getTableName(getConf()));
669          long numQueries = 0;
670          // If isSpecificStart is set, only walk one list from that particular node.
671          // Note that in case of circular (or P-shaped) list it will walk forever, as is
672          // the case in normal run without startKey.
673
674          CINode node = findStartNode(table, startKey);
675          if (node == null) {
676            LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
677            throw new IOException("Start node not found: " + startKeyIn);
678          }
679          while (numQueries < maxQueries) {
680            numQueries++;
681            byte[] prev = node.prev;
682            long t1 = System.currentTimeMillis();
683            node = getNode(prev, table, node);
684            long t2 = System.currentTimeMillis();
685            if (node == null) {
686              LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
687              context.getCounter(Counts.UNDEFINED).increment(1l);
688            } else if (node.prev.length == NO_KEY.length) {
689              LOG.error("ConcurrentWalker found TERMINATING NODE: " +
690                  Bytes.toStringBinary(node.key));
691              context.getCounter(Counts.TERMINATING).increment(1l);
692            } else {
693              // Increment for successful walk
694              context.getCounter(Counts.SUCCESS).increment(1l);
695            }
696          }
697          table.close();
698          connection.close();
699        }
700      }
701    }
702
703    @Override
704    public int run(String[] args) throws Exception {
705      if (args.length < 3) {
706        System.err.println(USAGE);
707        return 1;
708      }
709      try {
710        int numMappers = Integer.parseInt(args[0]);
711        long numNodes = Long.parseLong(args[1]);
712        Path tmpOutput = new Path(args[2]);
713        Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
714        Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
715        Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]);
716        return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
717      } catch (NumberFormatException e) {
718        System.err.println("Parsing generator arguments failed: " + e.getMessage());
719        System.err.println(USAGE);
720        return 1;
721      }
722    }
723
724    protected void createSchema() throws IOException {
725      Configuration conf = getConf();
726      TableName tableName = getTableName(conf);
727      try (Connection conn = ConnectionFactory.createConnection(conf);
728          Admin admin = conn.getAdmin()) {
729        if (!admin.tableExists(tableName)) {
730          HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
731          htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
732          // Always add these families. Just skip writing to them when we do not test per CF flush.
733          htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME));
734          htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME));
735          // if -DuseMob=true force all data through mob path.
736          if (conf.getBoolean("useMob", false)) {
737            for (HColumnDescriptor hcd : htd.getColumnFamilies() ) {
738              hcd.setMobEnabled(true);
739              hcd.setMobThreshold(4);
740            }
741          }
742
743          // If we want to pre-split compute how many splits.
744          if (conf.getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY,
745              HBaseTestingUtility.PRESPLIT_TEST_TABLE)) {
746            int numberOfServers = admin.getRegionServers().size();
747            if (numberOfServers == 0) {
748              throw new IllegalStateException("No live regionservers");
749            }
750            int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
751                HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
752            int totalNumberOfRegions = numberOfServers * regionsPerServer;
753            LOG.info("Number of live regionservers: " + numberOfServers + ", " +
754                "pre-splitting table into " + totalNumberOfRegions + " regions " +
755                "(default regions per server: " + regionsPerServer + ")");
756
757
758            byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
759
760            admin.createTable(htd, splits);
761          } else {
762            // Looks like we're just letting things play out.
763            // Create a table with on region by default.
764            // This will make the splitting work hard.
765            admin.createTable(htd);
766          }
767        }
768      } catch (MasterNotRunningException e) {
769        LOG.error("Master not running", e);
770        throw new IOException(e);
771      }
772    }
773
774    public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
775        Integer width, Integer wrapMultiplier, Integer numWalkers)
776        throws Exception {
777      LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
778          + ", numNodes=" + numNodes);
779      Job job = Job.getInstance(getConf());
780
781      job.setJobName("Random Input Generator");
782      job.setNumReduceTasks(0);
783      job.setJarByClass(getClass());
784
785      job.setInputFormatClass(GeneratorInputFormat.class);
786      job.setOutputKeyClass(BytesWritable.class);
787      job.setOutputValueClass(NullWritable.class);
788
789      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
790
791      job.setMapperClass(Mapper.class); //identity mapper
792
793      FileOutputFormat.setOutputPath(job, tmpOutput);
794      job.setOutputFormatClass(SequenceFileOutputFormat.class);
795      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);
796
797      boolean success = jobCompletion(job);
798
799      return success ? 0 : 1;
800    }
801
802    public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
803        Integer width, Integer wrapMultiplier, Integer numWalkers)
804        throws Exception {
805      LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
806      createSchema();
807      job = Job.getInstance(getConf());
808
809      job.setJobName("Link Generator");
810      job.setNumReduceTasks(0);
811      job.setJarByClass(getClass());
812
813      FileInputFormat.setInputPaths(job, tmpOutput);
814      job.setInputFormatClass(OneFilePerMapperSFIF.class);
815      job.setOutputKeyClass(NullWritable.class);
816      job.setOutputValueClass(NullWritable.class);
817
818      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
819
820      setMapperForGenerator(job);
821
822      job.setOutputFormatClass(NullOutputFormat.class);
823
824      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
825      TableMapReduceUtil.addDependencyJars(job);
826      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
827                                                     AbstractHBaseTool.class);
828      TableMapReduceUtil.initCredentials(job);
829
830      boolean success = jobCompletion(job);
831
832      return success ? 0 : 1;
833    }
834
835    protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
836        ClassNotFoundException {
837      boolean success = job.waitForCompletion(true);
838      return success;
839    }
840
841    protected void setMapperForGenerator(Job job) {
842      job.setMapperClass(GeneratorMapper.class);
843    }
844
845    public int run(int numMappers, long numNodes, Path tmpOutput,
846        Integer width, Integer wrapMultiplier, Integer numWalkers)
847        throws Exception {
848      int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier,
849          numWalkers);
850      if (ret > 0) {
851        return ret;
852      }
853      return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
854    }
855
856    public boolean verify() {
857      try {
858        Counters counters = job.getCounters();
859        if (counters == null) {
860          LOG.info("Counters object was null, Generator verification cannot be performed."
861              + " This is commonly a result of insufficient YARN configuration.");
862          return false;
863        }
864
865        if (counters.findCounter(Counts.TERMINATING).getValue() > 0 ||
866            counters.findCounter(Counts.UNDEFINED).getValue() > 0 ||
867            counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) {
868          LOG.error("Concurrent walker failed to verify during Generation phase");
869          LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue());
870          LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue());
871          LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue());
872          return false;
873        }
874      } catch (IOException e) {
875        LOG.info("Generator verification could not find counter");
876        return false;
877      }
878      return true;
879    }
880  }
881
882  /**
883   * Tool to search missing rows in WALs and hfiles.
884   * Pass in file or dir of keys to search for. Key file must have been written by Verify step
885   * (we depend on the format it writes out. We'll read them in and then search in hbase
886   * WALs and oldWALs dirs (Some of this is TODO).
887   */
888  static class Search extends Configured implements Tool {
889    private static final Logger LOG = LoggerFactory.getLogger(Search.class);
890    protected Job job;
891
892    private static void printUsage(final String error) {
893      if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
894      System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
895    }
896
897    @Override
898    public int run(String[] args) throws Exception {
899      if (args.length < 1 || args.length > 2) {
900        printUsage(null);
901        return 1;
902      }
903      Path inputDir = new Path(args[0]);
904      int numMappers = 1;
905      if (args.length > 1) {
906        numMappers = Integer.parseInt(args[1]);
907      }
908      return run(inputDir, numMappers);
909    }
910
911    /**
912     * WALPlayer override that searches for keys loaded in the setup.
913     */
914    public static class WALSearcher extends WALPlayer {
915      public WALSearcher(Configuration conf) {
916        super(conf);
917      }
918
919      /**
920       * The actual searcher mapper.
921       */
922      public static class WALMapperSearcher extends WALMapper {
923        private SortedSet<byte []> keysToFind;
924        private AtomicInteger rows = new AtomicInteger(0);
925
926        @Override
927        public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
928            throws IOException {
929          super.setup(context);
930          try {
931            this.keysToFind = readKeysToSearch(context.getConfiguration());
932            LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
933          } catch (InterruptedException e) {
934            throw new InterruptedIOException(e.toString());
935          }
936        }
937
938        @Override
939        protected boolean filter(Context context, Cell cell) {
940          // TODO: Can I do a better compare than this copying out key?
941          byte [] row = new byte [cell.getRowLength()];
942          System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
943          boolean b = this.keysToFind.contains(row);
944          if (b) {
945            String keyStr = Bytes.toStringBinary(row);
946            try {
947              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
948            } catch (IOException|InterruptedException e) {
949              LOG.warn(e.toString(), e);
950            }
951            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
952              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
953            }
954            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
955          }
956          return b;
957        }
958      }
959
960      // Put in place the above WALMapperSearcher.
961      @Override
962      public Job createSubmittableJob(String[] args) throws IOException {
963        Job job = super.createSubmittableJob(args);
964        // Call my class instead.
965        job.setJarByClass(WALMapperSearcher.class);
966        job.setMapperClass(WALMapperSearcher.class);
967        job.setOutputFormatClass(NullOutputFormat.class);
968        return job;
969      }
970    }
971
972    static final String FOUND_GROUP_KEY = "Found";
973    static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
974
975    public int run(Path inputDir, int numMappers) throws Exception {
976      getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
977      SortedSet<byte []> keys = readKeysToSearch(getConf());
978      if (keys.isEmpty()) throw new RuntimeException("No keys to find");
979      LOG.info("Count of keys to find: " + keys.size());
980      for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
981      // Now read all WALs. In two dirs. Presumes certain layout.
982      Path walsDir = new Path(
983          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
984      Path oldWalsDir = new Path(
985          CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
986      LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
987        " against " + getConf().get(HConstants.HBASE_DIR));
988      int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
989          new String [] {walsDir.toString(), ""});
990      if (ret != 0) {
991        return ret;
992      }
993      return ToolRunner.run(getConf(), new WALSearcher(getConf()),
994          new String [] {oldWalsDir.toString(), ""});
995    }
996
997    static SortedSet<byte []> readKeysToSearch(final Configuration conf)
998    throws IOException, InterruptedException {
999      Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
1000      FileSystem fs = FileSystem.get(conf);
1001      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1002      if (!fs.exists(keysInputDir)) {
1003        throw new FileNotFoundException(keysInputDir.toString());
1004      }
1005      if (!fs.isDirectory(keysInputDir)) {
1006        throw new UnsupportedOperationException("TODO");
1007      } else {
1008        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
1009        while(iterator.hasNext()) {
1010          LocatedFileStatus keyFileStatus = iterator.next();
1011          // Skip "_SUCCESS" file.
1012          if (keyFileStatus.getPath().getName().startsWith("_")) continue;
1013          result.addAll(readFileToSearch(conf, fs, keyFileStatus));
1014        }
1015      }
1016      return result;
1017    }
1018
1019    private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
1020        final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
1021        InterruptedException {
1022      SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1023      // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
1024      // what is missing.
1025      TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
1026      try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
1027          new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
1028        InputSplit is =
1029          new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
1030        rr.initialize(is, context);
1031        while (rr.nextKeyValue()) {
1032          rr.getCurrentKey();
1033          BytesWritable bw = rr.getCurrentValue();
1034          if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
1035            byte[] key = new byte[rr.getCurrentKey().getLength()];
1036            System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
1037                .getLength());
1038            result.add(key);
1039          }
1040        }
1041      }
1042      return result;
1043    }
1044  }
1045
1046  /**
1047   * A Map Reduce job that verifies that the linked lists generated by
1048   * {@link Generator} do not have any holes.
1049   */
1050  static class Verify extends Configured implements Tool {
1051
1052    private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
1053    protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
1054    protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
1055
1056    protected Job job;
1057
1058    public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
1059      private BytesWritable row = new BytesWritable();
1060      private BytesWritable ref = new BytesWritable();
1061
1062      private boolean multipleUnevenColumnFamilies;
1063
1064      @Override
1065      protected void setup(
1066          Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
1067          throws IOException, InterruptedException {
1068        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
1069      }
1070
1071      @Override
1072      protected void map(ImmutableBytesWritable key, Result value, Context context)
1073          throws IOException ,InterruptedException {
1074        byte[] rowKey = key.get();
1075        row.set(rowKey, 0, rowKey.length);
1076        if (multipleUnevenColumnFamilies
1077            && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn(
1078              TINY_FAMILY_NAME, TINY_FAMILY_NAME))) {
1079          context.write(row, DEF_LOST_FAMILIES);
1080        } else {
1081          context.write(row, DEF);
1082        }
1083        byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
1084        if (prev != null && prev.length > 0) {
1085          ref.set(prev, 0, prev.length);
1086          context.write(ref, row);
1087        } else {
1088          LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
1089        }
1090      }
1091    }
1092
1093    /**
1094     * Don't change the order of these enums. Their ordinals are used as type flag when we emit
1095     * problems found from the reducer.
1096     */
1097    public static enum Counts {
1098      UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES,
1099      LOST_FAMILIES
1100    }
1101
1102    /**
1103     * Per reducer, we output problem rows as byte arrasy so can be used as input for
1104     * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag
1105     * saying what sort of emission it is. Flag is the Count enum ordinal as a short.
1106     */
1107    public static class VerifyReducer extends
1108        Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
1109      private ArrayList<byte[]> refs = new ArrayList<>();
1110      private final BytesWritable UNREF = new BytesWritable(addPrefixFlag(
1111        Counts.UNREFERENCED.ordinal(), new byte[] {}));
1112      private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag(
1113        Counts.LOST_FAMILIES.ordinal(), new byte[] {}));
1114
1115      private AtomicInteger rows = new AtomicInteger(0);
1116      private Connection connection;
1117
1118      @Override
1119      protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1120      throws IOException, InterruptedException {
1121        super.setup(context);
1122        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
1123      }
1124
1125      @Override
1126      protected void cleanup(
1127          Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1128          throws IOException, InterruptedException {
1129        if (this.connection != null) {
1130          this.connection.close();
1131        }
1132        super.cleanup(context);
1133      }
1134
1135      /**
1136       * @param ordinal
1137       * @param r
1138       * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up
1139       * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
1140       */
1141      public static byte[] addPrefixFlag(final int ordinal, final byte [] r) {
1142        byte[] prefix = Bytes.toBytes((short)ordinal);
1143        if (prefix.length != Bytes.SIZEOF_SHORT) {
1144          throw new RuntimeException("Unexpected size: " + prefix.length);
1145        }
1146        byte[] result = new byte[prefix.length + r.length];
1147        System.arraycopy(prefix, 0, result, 0, prefix.length);
1148        System.arraycopy(r, 0, result, prefix.length, r.length);
1149        return result;
1150      }
1151
1152      /**
1153       * @param bs
1154       * @return Type from the Counts enum of this row. Reads prefix added by
1155       * {@link #addPrefixFlag(int, byte[])}
1156       */
1157      public static Counts whichType(final byte [] bs) {
1158        int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
1159        return Counts.values()[ordinal];
1160      }
1161
1162      /**
1163       * @param bw
1164       * @return Row bytes minus the type flag.
1165       */
1166      public static byte[] getRowOnly(BytesWritable bw) {
1167        byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
1168        System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
1169        return bytes;
1170      }
1171
1172      @Override
1173      public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
1174          throws IOException, InterruptedException {
1175        int defCount = 0;
1176        boolean lostFamilies = false;
1177        refs.clear();
1178        for (BytesWritable type : values) {
1179          if (type.getLength() == DEF.getLength()) {
1180            defCount++;
1181            if (type.getBytes()[0] == 1) {
1182              lostFamilies = true;
1183            }
1184          } else {
1185            byte[] bytes = new byte[type.getLength()];
1186            System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
1187            refs.add(bytes);
1188          }
1189        }
1190
1191        // TODO check for more than one def, should not happen
1192        StringBuilder refsSb = null;
1193        if (defCount == 0 || refs.size() != 1) {
1194          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1195          refsSb = dumpExtraInfoOnRefs(key, context, refs);
1196          LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
1197            (refsSb != null? refsSb.toString(): ""));
1198        }
1199        if (lostFamilies) {
1200          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1201          LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
1202          context.getCounter(Counts.LOST_FAMILIES).increment(1);
1203          context.write(key, LOSTFAM);
1204        }
1205
1206        if (defCount == 0 && refs.size() > 0) {
1207          // This is bad, found a node that is referenced but not defined. It must have been
1208          // lost, emit some info about this node for debugging purposes.
1209          // Write out a line per reference. If more than one, flag it.;
1210          for (int i = 0; i < refs.size(); i++) {
1211            byte[] bs = refs.get(i);
1212            int ordinal;
1213            if (i <= 0) {
1214              ordinal = Counts.UNDEFINED.ordinal();
1215              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1216              context.getCounter(Counts.UNDEFINED).increment(1);
1217            } else {
1218              ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
1219              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1220            }
1221          }
1222          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1223            // Print out missing row; doing get on reference gives info on when the referencer
1224            // was added which can help a little debugging. This info is only available in mapper
1225            // output -- the 'Linked List error Key...' log message above. What we emit here is
1226            // useless for debugging.
1227            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1228            context.getCounter("undef", keyString).increment(1);
1229          }
1230        } else if (defCount > 0 && refs.isEmpty()) {
1231          // node is defined but not referenced
1232          context.write(key, UNREF);
1233          context.getCounter(Counts.UNREFERENCED).increment(1);
1234          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1235            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1236            context.getCounter("unref", keyString).increment(1);
1237          }
1238        } else {
1239          if (refs.size() > 1) {
1240            // Skip first reference.
1241            for (int i = 1; i < refs.size(); i++) {
1242              context.write(key,
1243                new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
1244            }
1245            context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
1246          }
1247          // node is defined and referenced
1248          context.getCounter(Counts.REFERENCED).increment(1);
1249        }
1250      }
1251
1252      /**
1253       * Dump out extra info around references if there are any. Helps debugging.
1254       * @return StringBuilder filled with references if any.
1255       * @throws IOException
1256       */
1257      private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1258          final List<byte []> refs)
1259      throws IOException {
1260        StringBuilder refsSb = null;
1261        if (refs.isEmpty()) return refsSb;
1262        refsSb = new StringBuilder();
1263        String comma = "";
1264        // If a row is a reference but has no define, print the content of the row that has
1265        // this row as a 'prev'; it will help debug.  The missing row was written just before
1266        // the row we are dumping out here.
1267        TableName tn = getTableName(context.getConfiguration());
1268        try (Table t = this.connection.getTable(tn)) {
1269          for (byte [] ref : refs) {
1270            Result r = t.get(new Get(ref));
1271            List<Cell> cells = r.listCells();
1272            String ts = (cells != null && !cells.isEmpty())?
1273                new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
1274            byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1275            String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
1276            b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1277            long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
1278            b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1279            String refRegionLocation = "";
1280            String keyRegionLocation = "";
1281            if (b != null && b.length > 0) {
1282              try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1283                HRegionLocation hrl = rl.getRegionLocation(b);
1284                if (hrl != null) refRegionLocation = hrl.toString();
1285                // Key here probably has trailing zeros on it.
1286                hrl = rl.getRegionLocation(key.getBytes());
1287                if (hrl != null) keyRegionLocation = hrl.toString();
1288              }
1289            }
1290            LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) +
1291              ", refPrevEqualsKey=" +
1292                (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) +
1293                ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) +
1294                ", ref row date=" + ts + ", jobStr=" + jobStr +
1295                ", ref row count=" + count +
1296                ", ref row regionLocation=" + refRegionLocation +
1297                ", key row regionLocation=" + keyRegionLocation);
1298            refsSb.append(comma);
1299            comma = ",";
1300            refsSb.append(Bytes.toStringBinary(ref));
1301          }
1302        }
1303        return refsSb;
1304      }
1305    }
1306
1307    @Override
1308    public int run(String[] args) throws Exception {
1309      if (args.length != 2) {
1310        System.out.println("Usage : " + Verify.class.getSimpleName()
1311            + " <output dir> <num reducers>");
1312        return 0;
1313      }
1314
1315      String outputDir = args[0];
1316      int numReducers = Integer.parseInt(args[1]);
1317
1318      return run(outputDir, numReducers);
1319    }
1320
1321    public int run(String outputDir, int numReducers) throws Exception {
1322      return run(new Path(outputDir), numReducers);
1323    }
1324
1325    public int run(Path outputDir, int numReducers) throws Exception {
1326      LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
1327
1328      job = Job.getInstance(getConf());
1329
1330      job.setJobName("Link Verifier");
1331      job.setNumReduceTasks(numReducers);
1332      job.setJarByClass(getClass());
1333
1334      setJobScannerConf(job);
1335
1336      Scan scan = new Scan();
1337      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1338      scan.setCaching(10000);
1339      scan.setCacheBlocks(false);
1340      if (isMultiUnevenColumnFamilies(getConf())) {
1341        scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1342        scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1343      }
1344
1345      TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1346          VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1347      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
1348                                                     AbstractHBaseTool.class);
1349
1350      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1351
1352      job.setReducerClass(VerifyReducer.class);
1353      job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1354      job.setOutputKeyClass(BytesWritable.class);
1355      job.setOutputValueClass(BytesWritable.class);
1356      TextOutputFormat.setOutputPath(job, outputDir);
1357
1358      boolean success = job.waitForCompletion(true);
1359
1360      if (success) {
1361        Counters counters = job.getCounters();
1362        if (null == counters) {
1363          LOG.warn("Counters were null, cannot verify Job completion."
1364              + " This is commonly a result of insufficient YARN configuration.");
1365          // We don't have access to the counters to know if we have "bad" counts
1366          return 0;
1367        }
1368
1369        // If we find no unexpected values, the job didn't outright fail
1370        if (verifyUnexpectedValues(counters)) {
1371          // We didn't check referenced+unreferenced counts, leave that to visual inspection
1372          return 0;
1373        }
1374      }
1375
1376      // We failed
1377      return 1;
1378    }
1379
1380    public boolean verify(long expectedReferenced) throws Exception {
1381      if (job == null) {
1382        throw new IllegalStateException("You should call run() first");
1383      }
1384
1385      Counters counters = job.getCounters();
1386      if (counters == null) {
1387        LOG.info("Counters object was null, write verification cannot be performed."
1388              + " This is commonly a result of insufficient YARN configuration.");
1389        return false;
1390      }
1391
1392      // Run through each check, even if we fail one early
1393      boolean success = verifyExpectedValues(expectedReferenced, counters);
1394
1395      if (!verifyUnexpectedValues(counters)) {
1396        // We found counter objects which imply failure
1397        success = false;
1398      }
1399
1400      if (!success) {
1401        handleFailure(counters);
1402      }
1403      return success;
1404    }
1405
1406    /**
1407     * Verify the values in the Counters against the expected number of entries written.
1408     *
1409     * @param expectedReferenced
1410     *          Expected number of referenced entrires
1411     * @param counters
1412     *          The Job's Counters object
1413     * @return True if the values match what's expected, false otherwise
1414     */
1415    protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1416      final Counter referenced = counters.findCounter(Counts.REFERENCED);
1417      final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
1418      boolean success = true;
1419
1420      if (expectedReferenced != referenced.getValue()) {
1421        LOG.error("Expected referenced count does not match with actual referenced count. " +
1422            "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1423        success = false;
1424      }
1425
1426      if (unreferenced.getValue() > 0) {
1427        final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
1428        boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1429        LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1430            + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1431        success = false;
1432      }
1433
1434      return success;
1435    }
1436
1437    /**
1438     * Verify that the Counters don't contain values which indicate an outright failure from the Reducers.
1439     *
1440     * @param counters
1441     *          The Job's counters
1442     * @return True if the "bad" counter objects are 0, false otherwise
1443     */
1444    protected boolean verifyUnexpectedValues(Counters counters) {
1445      final Counter undefined = counters.findCounter(Counts.UNDEFINED);
1446      final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
1447      boolean success = true;
1448
1449      if (undefined.getValue() > 0) {
1450        LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1451        success = false;
1452      }
1453
1454      if (lostfamilies.getValue() > 0) {
1455        LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1456        success = false;
1457      }
1458
1459      return success;
1460    }
1461
1462    protected void handleFailure(Counters counters) throws IOException {
1463      Configuration conf = job.getConfiguration();
1464      TableName tableName = getTableName(conf);
1465      try (Connection conn = ConnectionFactory.createConnection(conf)) {
1466        try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1467          CounterGroup g = counters.getGroup("undef");
1468          Iterator<Counter> it = g.iterator();
1469          while (it.hasNext()) {
1470            String keyString = it.next().getName();
1471            byte[] key = Bytes.toBytes(keyString);
1472            HRegionLocation loc = rl.getRegionLocation(key, true);
1473            LOG.error("undefined row " + keyString + ", " + loc);
1474          }
1475          g = counters.getGroup("unref");
1476          it = g.iterator();
1477          while (it.hasNext()) {
1478            String keyString = it.next().getName();
1479            byte[] key = Bytes.toBytes(keyString);
1480            HRegionLocation loc = rl.getRegionLocation(key, true);
1481            LOG.error("unreferred row " + keyString + ", " + loc);
1482          }
1483        }
1484      }
1485    }
1486  }
1487
1488  /**
1489   * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
1490   * adds more data.
1491   */
1492  static class Loop extends Configured implements Tool {
1493
1494    private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
1495    private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " +
1496        "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" +
1497        " <num walker threads>] \n" +
1498        "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" +
1499        "walkers will select and verify random flushed loop during Generation.";
1500
1501    IntegrationTestBigLinkedList it;
1502
1503    protected void runGenerator(int numMappers, long numNodes,
1504        String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers)
1505        throws Exception {
1506      Path outputPath = new Path(outputDir);
1507      UUID uuid = UUID.randomUUID(); //create a random UUID.
1508      Path generatorOutput = new Path(outputPath, uuid.toString());
1509
1510      Generator generator = new Generator();
1511      generator.setConf(getConf());
1512      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier,
1513          numWalkers);
1514      if (retCode > 0) {
1515        throw new RuntimeException("Generator failed with return code: " + retCode);
1516      }
1517      if (numWalkers > 0) {
1518        if (!generator.verify()) {
1519          throw new RuntimeException("Generator.verify failed");
1520        }
1521      }
1522    }
1523
1524    protected void runVerify(String outputDir,
1525        int numReducers, long expectedNumNodes) throws Exception {
1526      Path outputPath = new Path(outputDir);
1527      UUID uuid = UUID.randomUUID(); //create a random UUID.
1528      Path iterationOutput = new Path(outputPath, uuid.toString());
1529
1530      Verify verify = new Verify();
1531      verify.setConf(getConf());
1532      int retCode = verify.run(iterationOutput, numReducers);
1533      if (retCode > 0) {
1534        throw new RuntimeException("Verify.run failed with return code: " + retCode);
1535      }
1536
1537      if (!verify.verify(expectedNumNodes)) {
1538        throw new RuntimeException("Verify.verify failed");
1539      }
1540      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
1541    }
1542
1543    @Override
1544    public int run(String[] args) throws Exception {
1545      if (args.length < 5) {
1546        System.err.println(USAGE);
1547        return 1;
1548      }
1549      try {
1550        int numIterations = Integer.parseInt(args[0]);
1551        int numMappers = Integer.parseInt(args[1]);
1552        long numNodes = Long.parseLong(args[2]);
1553        String outputDir = args[3];
1554        int numReducers = Integer.parseInt(args[4]);
1555        Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1556        Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1557        Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]);
1558
1559        long expectedNumNodes = 0;
1560
1561        if (numIterations < 0) {
1562          numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
1563        }
1564        LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1565        for (int i = 0; i < numIterations; i++) {
1566          LOG.info("Starting iteration = " + i);
1567          runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers);
1568          expectedNumNodes += numMappers * numNodes;
1569          runVerify(outputDir, numReducers, expectedNumNodes);
1570        }
1571        return 0;
1572      } catch (NumberFormatException e) {
1573        System.err.println("Parsing loop arguments failed: " + e.getMessage());
1574        System.err.println(USAGE);
1575        return 1;
1576      }
1577    }
1578  }
1579
1580  /**
1581   * A stand alone program that prints out portions of a list created by {@link Generator}
1582   */
1583  private static class Print extends Configured implements Tool {
1584    @Override
1585    public int run(String[] args) throws Exception {
1586      Options options = new Options();
1587      options.addOption("s", "start", true, "start key");
1588      options.addOption("e", "end", true, "end key");
1589      options.addOption("l", "limit", true, "number to print");
1590
1591      GnuParser parser = new GnuParser();
1592      CommandLine cmd = null;
1593      try {
1594        cmd = parser.parse(options, args);
1595        if (cmd.getArgs().length != 0) {
1596          throw new ParseException("Command takes no arguments");
1597        }
1598      } catch (ParseException e) {
1599        System.err.println("Failed to parse command line " + e.getMessage());
1600        System.err.println();
1601        HelpFormatter formatter = new HelpFormatter();
1602        formatter.printHelp(getClass().getSimpleName(), options);
1603        System.exit(-1);
1604      }
1605
1606      Connection connection = ConnectionFactory.createConnection(getConf());
1607      Table table = connection.getTable(getTableName(getConf()));
1608
1609      Scan scan = new Scan();
1610      scan.setBatch(10000);
1611
1612      if (cmd.hasOption("s"))
1613        scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1614
1615      if (cmd.hasOption("e"))
1616        scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1617
1618      int limit = 0;
1619      if (cmd.hasOption("l"))
1620        limit = Integer.parseInt(cmd.getOptionValue("l"));
1621      else
1622        limit = 100;
1623
1624      ResultScanner scanner = table.getScanner(scan);
1625
1626      CINode node = new CINode();
1627      Result result = scanner.next();
1628      int count = 0;
1629      while (result != null && count++ < limit) {
1630        node = getCINode(result, node);
1631        System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1632            Bytes.toStringBinary(node.prev), node.count, node.client);
1633        result = scanner.next();
1634      }
1635      scanner.close();
1636      table.close();
1637      connection.close();
1638
1639      return 0;
1640    }
1641  }
1642
1643  /**
1644   * A stand alone program that deletes a single node.
1645   */
1646  private static class Delete extends Configured implements Tool {
1647    @Override
1648    public int run(String[] args) throws Exception {
1649      if (args.length != 1) {
1650        System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1651        return 0;
1652      }
1653      byte[] val = Bytes.toBytesBinary(args[0]);
1654
1655      org.apache.hadoop.hbase.client.Delete delete
1656        = new org.apache.hadoop.hbase.client.Delete(val);
1657
1658      try (Connection connection = ConnectionFactory.createConnection(getConf());
1659          Table table = connection.getTable(getTableName(getConf()))) {
1660        table.delete(delete);
1661      }
1662
1663      System.out.println("Delete successful");
1664      return 0;
1665    }
1666  }
1667
1668  abstract static class WalkerBase extends Configured{
1669    protected static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1670      Scan scan = new Scan();
1671      scan.setStartRow(startKey);
1672      scan.setBatch(1);
1673      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1674
1675      long t1 = System.currentTimeMillis();
1676      ResultScanner scanner = table.getScanner(scan);
1677      Result result = scanner.next();
1678      long t2 = System.currentTimeMillis();
1679      scanner.close();
1680
1681      if ( result != null) {
1682        CINode node = getCINode(result, new CINode());
1683        System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1684        return node;
1685      }
1686
1687      System.out.println("FSR " + (t2 - t1));
1688
1689      return null;
1690    }
1691    protected CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1692      Get get = new Get(row);
1693      get.addColumn(FAMILY_NAME, COLUMN_PREV);
1694      Result result = table.get(get);
1695      return getCINode(result, node);
1696    }
1697  }
1698  /**
1699   * A stand alone program that follows a linked list created by {@link Generator} and prints
1700   * timing info.
1701   */
1702  private static class Walker extends WalkerBase implements Tool {
1703
1704    public Walker(){}
1705
1706    @Override
1707    public int run(String[] args) throws IOException {
1708
1709      Options options = new Options();
1710      options.addOption("n", "num", true, "number of queries");
1711      options.addOption("s", "start", true, "key to start at, binary string");
1712      options.addOption("l", "logevery", true, "log every N queries");
1713
1714      GnuParser parser = new GnuParser();
1715      CommandLine cmd = null;
1716      try {
1717        cmd = parser.parse(options, args);
1718        if (cmd.getArgs().length != 0) {
1719          throw new ParseException("Command takes no arguments");
1720        }
1721      } catch (ParseException e) {
1722        System.err.println("Failed to parse command line " + e.getMessage());
1723        System.err.println();
1724        HelpFormatter formatter = new HelpFormatter();
1725        formatter.printHelp(getClass().getSimpleName(), options);
1726        System.exit(-1);
1727      }
1728
1729      long maxQueries = Long.MAX_VALUE;
1730      if (cmd.hasOption('n')) {
1731        maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1732      }
1733      Random rand = new SecureRandom();
1734      boolean isSpecificStart = cmd.hasOption('s');
1735
1736      byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1737      int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1738
1739      Connection connection = ConnectionFactory.createConnection(getConf());
1740      Table table = connection.getTable(getTableName(getConf()));
1741      long numQueries = 0;
1742      // If isSpecificStart is set, only walk one list from that particular node.
1743      // Note that in case of circular (or P-shaped) list it will walk forever, as is
1744      // the case in normal run without startKey.
1745      while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1746        if (!isSpecificStart) {
1747          startKey = new byte[ROWKEY_LENGTH];
1748          rand.nextBytes(startKey);
1749        }
1750        CINode node = findStartNode(table, startKey);
1751        if (node == null && isSpecificStart) {
1752          System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1753        }
1754        numQueries++;
1755        while (node != null && node.prev.length != NO_KEY.length &&
1756            numQueries < maxQueries) {
1757          byte[] prev = node.prev;
1758          long t1 = System.currentTimeMillis();
1759          node = getNode(prev, table, node);
1760          long t2 = System.currentTimeMillis();
1761          if (logEvery > 0 && numQueries % logEvery == 0) {
1762            System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1763          }
1764          numQueries++;
1765          if (node == null) {
1766            System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1767          } else if (node.prev.length == NO_KEY.length) {
1768            System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1769          }
1770        }
1771      }
1772      table.close();
1773      connection.close();
1774      return 0;
1775    }
1776  }
1777
1778  private static class Clean extends Configured implements Tool {
1779    @Override public int run(String[] args) throws Exception {
1780      if (args.length < 1) {
1781        System.err.println("Usage: Clean <output dir>");
1782        return -1;
1783      }
1784
1785      Path p = new Path(args[0]);
1786      Configuration conf = getConf();
1787      TableName tableName = getTableName(conf);
1788      try (FileSystem fs = HFileSystem.get(conf);
1789          Connection conn = ConnectionFactory.createConnection(conf);
1790          Admin admin = conn.getAdmin()) {
1791        if (admin.tableExists(tableName)) {
1792          admin.disableTable(tableName);
1793          admin.deleteTable(tableName);
1794        }
1795
1796        if (fs.exists(p)) {
1797          fs.delete(p, true);
1798        }
1799      }
1800
1801      return 0;
1802    }
1803  }
1804
1805  static TableName getTableName(Configuration conf) {
1806    return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1807  }
1808
1809  private static CINode getCINode(Result result, CINode node) {
1810    node.key = Bytes.copy(result.getRow());
1811    if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1812      node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1813    } else {
1814      node.prev = NO_KEY;
1815    }
1816    if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1817      node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1818    } else {
1819      node.count = -1;
1820    }
1821    if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1822      node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1823    } else {
1824      node.client = "";
1825    }
1826    return node;
1827  }
1828
1829  protected IntegrationTestingUtility util;
1830
1831  @Override
1832  public void setUpCluster() throws Exception {
1833    util = getTestingUtil(getConf());
1834    boolean isDistributed = util.isDistributedCluster();
1835    util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1836    if (!isDistributed) {
1837      util.startMiniMapReduceCluster();
1838    }
1839    this.setConf(util.getConfiguration());
1840  }
1841
1842  @Override
1843  public void cleanUpCluster() throws Exception {
1844    super.cleanUpCluster();
1845    if (util.isDistributedCluster()) {
1846      util.shutdownMiniMapReduceCluster();
1847    }
1848  }
1849
1850  private static boolean isMultiUnevenColumnFamilies(Configuration conf) {
1851    return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,true);
1852  }
1853
1854  @Test
1855  public void testContinuousIngest() throws IOException, Exception {
1856    //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1857    Configuration conf = getTestingUtil(getConf()).getConfiguration();
1858    if (isMultiUnevenColumnFamilies(getConf())) {
1859      // make sure per CF flush is on
1860      conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
1861    }
1862    int ret =
1863        ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1864            util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1865    org.junit.Assert.assertEquals(0, ret);
1866  }
1867
1868  private void usage() {
1869    System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1870    printCommands();
1871  }
1872
1873  private void printCommands() {
1874    System.err.println("Commands:");
1875    System.err.println(" generator  Map only job that generates data.");
1876    System.err.println(" verify     A map reduce job that looks for holes. Check return code and");
1877    System.err.println("            look at the counts after running. See REFERENCED and");
1878    System.err.println("            UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1879    System.err.println("            with the Generator.");
1880    System.err.println(" walker     " +
1881      "Standalone program that starts following a linked list & emits timing info.");
1882    System.err.println(" print      Standalone program that prints nodes in the linked list.");
1883    System.err.println(" delete     Standalone program that deletes a·single node.");
1884    System.err.println(" loop       Program to Loop through Generator and Verify steps");
1885    System.err.println(" clean      Program to clean all left over detritus.");
1886    System.err.println(" search     Search for missing keys.");
1887    System.err.println("");
1888    System.err.println("General options:");
1889    System.err.println(" -D"+ TABLE_NAME_KEY+ "=<tableName>");
1890    System.err.println("    Run using the <tableName> as the tablename.  Defaults to "
1891        + DEFAULT_TABLE_NAME);
1892    System.err.println(" -D"+ HBaseTestingUtility.REGIONS_PER_SERVER_KEY+ "=<# regions>");
1893    System.err.println("    Create table with presplit regions per server.  Defaults to "
1894        + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
1895
1896    System.err.println(" -DuseMob=<true|false>");
1897    System.err.println("    Create table so that the mob read/write path is forced.  " +
1898        "Defaults to false");
1899
1900    System.err.flush();
1901  }
1902
1903  @Override
1904  protected void processOptions(CommandLine cmd) {
1905    super.processOptions(cmd);
1906    String[] args = cmd.getArgs();
1907    //get the class, run with the conf
1908    if (args.length < 1) {
1909      printUsage(this.getClass().getSimpleName() +
1910        " <general options> COMMAND [<COMMAND options>]", "General options:", "");
1911      printCommands();
1912      // Have to throw an exception here to stop the processing. Looks ugly but gets message across.
1913      throw new RuntimeException("Incorrect Number of args.");
1914    }
1915    toRun = args[0];
1916    otherArgs = Arrays.copyOfRange(args, 1, args.length);
1917  }
1918
1919  @Override
1920  public int runTestFromCommandLine() throws Exception {
1921    Tool tool = null;
1922    if (toRun.equalsIgnoreCase("Generator")) {
1923      tool = new Generator();
1924    } else if (toRun.equalsIgnoreCase("Verify")) {
1925      tool = new Verify();
1926    } else if (toRun.equalsIgnoreCase("Loop")) {
1927      Loop loop = new Loop();
1928      loop.it = this;
1929      tool = loop;
1930    } else if (toRun.equalsIgnoreCase("Walker")) {
1931      tool = new Walker();
1932    } else if (toRun.equalsIgnoreCase("Print")) {
1933      tool = new Print();
1934    } else if (toRun.equalsIgnoreCase("Delete")) {
1935      tool = new Delete();
1936    } else if (toRun.equalsIgnoreCase("Clean")) {
1937      tool = new Clean();
1938    } else if (toRun.equalsIgnoreCase("Search")) {
1939      tool = new Search();
1940    } else {
1941      usage();
1942      throw new RuntimeException("Unknown arg");
1943    }
1944
1945    return ToolRunner.run(getConf(), tool, otherArgs);
1946  }
1947
1948  @Override
1949  public TableName getTablename() {
1950    Configuration c = getConf();
1951    return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1952  }
1953
1954  @Override
1955  protected Set<String> getColumnFamilies() {
1956    if (isMultiUnevenColumnFamilies(getConf())) {
1957      return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
1958        Bytes.toString(TINY_FAMILY_NAME));
1959    } else {
1960      return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1961    }
1962  }
1963
1964  private static void setJobConf(Job job, int numMappers, long numNodes,
1965      Integer width, Integer wrapMultiplier, Integer numWalkers) {
1966    job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1967    job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1968    if (width != null) {
1969      job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1970    }
1971    if (wrapMultiplier != null) {
1972      job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1973    }
1974    if (numWalkers != null) {
1975      job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers);
1976    }
1977  }
1978
1979  public static void setJobScannerConf(Job job) {
1980    // Make sure scanners log something useful to make debugging possible.
1981    job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
1982    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1983  }
1984
1985  public static void main(String[] args) throws Exception {
1986    Configuration conf = HBaseConfiguration.create();
1987    IntegrationTestingUtility.setUseDistributedCluster(conf);
1988    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1989    System.exit(ret);
1990  }
1991}