001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import static org.apache.hadoop.hbase.IntegrationTestingUtility.DEFAULT_REGIONS_PER_SERVER;
021import static org.apache.hadoop.hbase.IntegrationTestingUtility.PRESPLIT_TEST_TABLE;
022import static org.apache.hadoop.hbase.IntegrationTestingUtility.PRESPLIT_TEST_TABLE_KEY;
023import static org.apache.hadoop.hbase.IntegrationTestingUtility.REGIONS_PER_SERVER_KEY;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025
026import java.io.DataInput;
027import java.io.DataOutput;
028import java.io.FileNotFoundException;
029import java.io.IOException;
030import java.io.InterruptedIOException;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Set;
036import java.util.SortedSet;
037import java.util.TreeSet;
038import java.util.UUID;
039import java.util.concurrent.ThreadLocalRandom;
040import java.util.concurrent.atomic.AtomicInteger;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.conf.Configured;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.LocatedFileStatus;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.RemoteIterator;
047import org.apache.hadoop.hbase.Cell;
048import org.apache.hadoop.hbase.HBaseConfiguration;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.HRegionLocation;
051import org.apache.hadoop.hbase.IntegrationTestBase;
052import org.apache.hadoop.hbase.IntegrationTestingUtility;
053import org.apache.hadoop.hbase.MasterNotRunningException;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.Admin;
056import org.apache.hadoop.hbase.client.BufferedMutator;
057import org.apache.hadoop.hbase.client.BufferedMutatorParams;
058import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
059import org.apache.hadoop.hbase.client.Connection;
060import org.apache.hadoop.hbase.client.ConnectionConfiguration;
061import org.apache.hadoop.hbase.client.ConnectionFactory;
062import org.apache.hadoop.hbase.client.Get;
063import org.apache.hadoop.hbase.client.Mutation;
064import org.apache.hadoop.hbase.client.Put;
065import org.apache.hadoop.hbase.client.RegionLocator;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.client.ResultScanner;
068import org.apache.hadoop.hbase.client.Scan;
069import org.apache.hadoop.hbase.client.Table;
070import org.apache.hadoop.hbase.client.TableDescriptor;
071import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
072import org.apache.hadoop.hbase.fs.HFileSystem;
073import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
074import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
075import org.apache.hadoop.hbase.mapreduce.TableMapper;
076import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
077import org.apache.hadoop.hbase.mapreduce.WALPlayer;
078import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy;
079import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
080import org.apache.hadoop.hbase.testclassification.IntegrationTests;
081import org.apache.hadoop.hbase.util.AbstractHBaseTool;
082import org.apache.hadoop.hbase.util.Bytes;
083import org.apache.hadoop.hbase.util.CommonFSUtils;
084import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
085import org.apache.hadoop.hbase.util.Random64;
086import org.apache.hadoop.hbase.util.RegionSplitter;
087import org.apache.hadoop.hbase.wal.WALEdit;
088import org.apache.hadoop.hbase.wal.WALKey;
089import org.apache.hadoop.io.BytesWritable;
090import org.apache.hadoop.io.NullWritable;
091import org.apache.hadoop.io.Writable;
092import org.apache.hadoop.mapreduce.Counter;
093import org.apache.hadoop.mapreduce.CounterGroup;
094import org.apache.hadoop.mapreduce.Counters;
095import org.apache.hadoop.mapreduce.InputFormat;
096import org.apache.hadoop.mapreduce.InputSplit;
097import org.apache.hadoop.mapreduce.Job;
098import org.apache.hadoop.mapreduce.JobContext;
099import org.apache.hadoop.mapreduce.Mapper;
100import org.apache.hadoop.mapreduce.RecordReader;
101import org.apache.hadoop.mapreduce.Reducer;
102import org.apache.hadoop.mapreduce.TaskAttemptContext;
103import org.apache.hadoop.mapreduce.TaskAttemptID;
104import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
105import org.apache.hadoop.mapreduce.lib.input.FileSplit;
106import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
107import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
108import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
109import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
110import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
111import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
112import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
113import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
114import org.apache.hadoop.util.Tool;
115import org.apache.hadoop.util.ToolRunner;
116import org.junit.jupiter.api.Tag;
117import org.junit.jupiter.api.Test;
118import org.slf4j.Logger;
119import org.slf4j.LoggerFactory;
120
121import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
122import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
123import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
124import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
125import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
126import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
127import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
128import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
129
130/**
131 * <p>
132 * This is an integration test borrowed from goraci, written by Keith Turner, which is in turn
133 * inspired by the Accumulo test called continous ingest (ci). The original source code can be found
134 * here:
135 * <ul>
136 * <li>https://github.com/keith-turner/goraci</li>
137 * <li>https://github.com/enis/goraci/</li>
138 * </ul>
139 * </p>
140 * <p>
141 * Apache Accumulo [0] has a simple test suite that verifies that data is not lost at scale. This
142 * test suite is called continuous ingest. This test runs many ingest clients that continually
143 * create linked lists containing 25 million nodes. At some point the clients are stopped and a map
144 * reduce job is run to ensure no linked list has a hole. A hole indicates data was lost.
145 * </p>
146 * <p>
147 * The nodes in the linked list are random. This causes each linked list to spread across the table.
148 * Therefore if one part of a table loses data, then it will be detected by references in another
149 * part of the table.
150 * </p>
151 * <p>
152 * <h3>THE ANATOMY OF THE TEST</h3> Below is rough sketch of how data is written. For specific
153 * details look at the Generator code.
154 * </p>
155 * <p>
156 * <ol>
157 * <li>Write out 1 million nodes (1M is the configurable 'width' mentioned below)</li>
158 * <li>Flush the client</li>
159 * <li>Write out 1 million that reference previous million</li>
160 * <li>If this is the 25th set of 1 million nodes, then update 1st set of million to point to last
161 * (25 is configurable; its the 'wrap multiplier' referred to below)</li>
162 * <li>goto 1</li>
163 * </ol>
164 * </p>
165 * <p>
166 * The key is that nodes only reference flushed nodes. Therefore a node should never reference a
167 * missing node, even if the ingest client is killed at any point in time.
168 * </p>
169 * <p>
170 * When running this test suite w/ Accumulo there is a script running in parallel called the
171 * Aggitator that randomly and continuously kills server processes. The outcome was that many data
172 * loss bugs were found in Accumulo by doing this. This test suite can also help find bugs that
173 * impact uptime and stability when run for days or weeks.
174 * </p>
175 * <p>
176 * This test suite consists the following
177 * <ul>
178 * <li>a few Java programs</li>
179 * <li>a little helper script to run the java programs</li>
180 * <li>a maven script to build it</li>
181 * </ul>
182 * </p>
183 * <p>
184 * When generating data, its best to have each map task generate a multiple of 25 million. The
185 * reason for this is that circular linked list are generated every 25M. Not generating a multiple
186 * in 25M will result in some nodes in the linked list not having references. The loss of an
187 * unreferenced node can not be detected.
188 * </p>
189 * <p>
190 * <h3>Below is a description of the Java programs</h3>
191 * <ul>
192 * <li>{@code Generator} - A map only job that generates data. As stated previously, its best to
193 * generate data in multiples of 25M. An option is also available to allow concurrent walkers to
194 * select and walk random flushed loops during this phase.</li>
195 * <li>{@code Verify} - A map reduce job that looks for holes. Look at the counts after running.
196 * {@code REFERENCED} and {@code UNREFERENCED} are ok, any {@code UNDEFINED} counts are bad. Do not
197 * run at the same time as the Generator.</li>
198 * <li>{@code Walker} - A standalone program that start following a linked list and emits timing
199 * info.</li>
200 * <li>{@code Print} - A standalone program that prints nodes in the linked list</li>
201 * <li>{@code Delete} - A standalone program that deletes a single node</li>
202 * </ul>
203 * This class can be run as a unit test, as an integration test, or from the command line
204 * </p>
205 * <p>
206 * ex:
207 *
208 * <pre>
209 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
210 *    loop 2 1 100000 /temp 1 1000 50 1 0
211 * </pre>
212 * </p>
213 */
214@Tag(IntegrationTests.TAG)
215public class IntegrationTestBigLinkedList extends IntegrationTestBase {
216  protected static final byte[] NO_KEY = new byte[1];
217  protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
218  protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
219  protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
220  private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
221  private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
222
223  // link to the id of the prev node in the linked list
224  protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
225
226  // identifier of the mapred task that generated this row
227  protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
228
229  // the id of the row within the same client.
230  protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
231
232  /** How many rows to write per map task. This has to be a multiple of 25M */
233  private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY =
234    "IntegrationTestBigLinkedList.generator.num_rows";
235
236  private static final String GENERATOR_NUM_MAPPERS_KEY =
237    "IntegrationTestBigLinkedList.generator.map.tasks";
238
239  private static final String GENERATOR_WIDTH_KEY = "IntegrationTestBigLinkedList.generator.width";
240
241  private static final String GENERATOR_WRAP_KEY = "IntegrationTestBigLinkedList.generator.wrap";
242
243  private static final String CONCURRENT_WALKER_KEY =
244    "IntegrationTestBigLinkedList.generator.concurrentwalkers";
245
246  protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
247
248  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
249
250  private static final int WIDTH_DEFAULT = 1000000;
251
252  /**
253   * The 'wrap multipler' default.
254   */
255  private static final int WRAP_DEFAULT = 25;
256  private static final int ROWKEY_LENGTH = 16;
257
258  private static final int CONCURRENT_WALKER_DEFAULT = 0;
259
260  protected String toRun;
261  protected String[] otherArgs;
262
263  static class CINode {
264    byte[] key;
265    byte[] prev;
266    String client;
267    long count;
268  }
269
270  /**
271   * A Map only job that generates random linked list and stores them.
272   */
273  static class Generator extends Configured implements Tool {
274    private static final Logger LOG = LoggerFactory.getLogger(Generator.class);
275
276    /**
277     * Set this configuration if you want to test single-column family flush works. If set, we will
278     * add a big column family and a small column family on either side of the usual ITBLL 'meta'
279     * column family. When we write out the ITBLL, we will also add to the big column family a value
280     * bigger than that for ITBLL and for small, something way smaller. The idea is that when
281     * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
282     * way. Here is how you would pass it:
283     * <p>
284     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
285     * -Dgenerator.multiple.columnfamilies=true generator 1 10 g
286     */
287    public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
288      "generator.multiple.columnfamilies";
289
290    /**
291     * Set this configuration if you want to scale up the size of test data quickly.
292     * <p>
293     * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
294     * -Dgenerator.big.family.value.size=1024 generator 1 10 output
295     */
296    public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size";
297
298    public static enum GeneratorCounts {
299      SUCCESS,
300      TERMINATING,
301      UNDEFINED,
302      IOEXCEPTION
303    }
304
305    public static final String USAGE = "Usage : " + Generator.class.getSimpleName()
306      + " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>"
307      + " <num walker threads>] \n"
308      + "Where <num nodes per map> should be a multiple of 'width' * 'wrap multiplier'.\n"
309      + "25M is default because default 'width' is 1M and default 'wrap multiplier' is 25.\n"
310      + "We write out 1M nodes and then flush the client. After 25 flushes, we connect \n"
311      + "first written nodes back to the 25th set.\n"
312      + "Walkers verify random flushed loops during Generation.";
313
314    public Job job;
315
316    static class GeneratorInputFormat extends InputFormat<BytesWritable, NullWritable> {
317      static class GeneratorInputSplit extends InputSplit implements Writable {
318        @Override
319        public long getLength() throws IOException, InterruptedException {
320          return 1;
321        }
322
323        @Override
324        public String[] getLocations() throws IOException, InterruptedException {
325          return new String[0];
326        }
327
328        @Override
329        public void readFields(DataInput arg0) throws IOException {
330        }
331
332        @Override
333        public void write(DataOutput arg0) throws IOException {
334        }
335      }
336
337      static class GeneratorRecordReader extends RecordReader<BytesWritable, NullWritable> {
338        private long count;
339        private long numNodes;
340        // Use Random64 to avoid issue described in HBASE-21256.
341        private Random64 rand = new Random64();
342
343        @Override
344        public void close() throws IOException {
345        }
346
347        @Override
348        public BytesWritable getCurrentKey() throws IOException, InterruptedException {
349          byte[] bytes = new byte[ROWKEY_LENGTH];
350          rand.nextBytes(bytes);
351          return new BytesWritable(bytes);
352        }
353
354        @Override
355        public NullWritable getCurrentValue() throws IOException, InterruptedException {
356          return NullWritable.get();
357        }
358
359        @Override
360        public float getProgress() throws IOException, InterruptedException {
361          return (float) (count / (double) numNodes);
362        }
363
364        @Override
365        public void initialize(InputSplit arg0, TaskAttemptContext context)
366          throws IOException, InterruptedException {
367          numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
368        }
369
370        @Override
371        public boolean nextKeyValue() throws IOException, InterruptedException {
372          return count++ < numNodes;
373        }
374      }
375
376      @Override
377      public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
378        TaskAttemptContext context) throws IOException, InterruptedException {
379        GeneratorRecordReader rr = new GeneratorRecordReader();
380        rr.initialize(split, context);
381        return rr;
382      }
383
384      @Override
385      public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
386        int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
387
388        ArrayList<InputSplit> splits = new ArrayList<>(numMappers);
389
390        for (int i = 0; i < numMappers; i++) {
391          splits.add(new GeneratorInputSplit());
392        }
393
394        return splits;
395      }
396    }
397
398    /** Ensure output files from prev-job go to map inputs for current job */
399    static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
400      @Override
401      protected boolean isSplitable(JobContext context, Path filename) {
402        return false;
403      }
404    }
405
406    /**
407     * Some ASCII art time:
408     * <p>
409     * [ . . . ] represents one batch of random longs of length WIDTH
410     *
411     * <pre>
412     *                _________________________
413     *               |                  ______ |
414     *               |                 |      ||
415     *             .-+-----------------+-----.||
416     *             | |                 |     |||
417     * first   = [ . . . . . . . . . . . ]   |||
418     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
419     *             | | | | | | | | | | |     |||
420     * prev    = [ . . . . . . . . . . . ]   |||
421     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
422     *             | | | | | | | | | | |     |||
423     * current = [ . . . . . . . . . . . ]   |||
424     *                                       |||
425     * ...                                   |||
426     *                                       |||
427     * last    = [ . . . . . . . . . . . ]   |||
428     *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
429     *             |                 |________||
430     *             |___________________________|
431     * </pre>
432     */
433
434    static class GeneratorMapper
435      extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
436
437      byte[][] first = null;
438      byte[][] prev = null;
439      byte[][] current = null;
440      byte[] id;
441      long count = 0;
442      int i;
443      BufferedMutator mutator;
444      Connection connection;
445      long numNodes;
446      long wrap;
447      int width;
448      boolean multipleUnevenColumnFamilies;
449      byte[] tinyValue = new byte[] { 't' };
450      byte[] bigValue = null;
451      Configuration conf;
452      // Use Random64 to avoid issue described in HBASE-21256.
453      private Random64 rand = new Random64();
454
455      volatile boolean walkersStop;
456      int numWalkers;
457      final Object flushedLoopsLock = new Object();
458      volatile List<Long> flushedLoops = new ArrayList<>();
459      List<Thread> walkers = new ArrayList<>();
460
461      @Override
462      protected void setup(Context context) throws IOException, InterruptedException {
463        id = Bytes.toBytes("Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID());
464        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
465        instantiateHTable();
466        this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
467        current = new byte[this.width][];
468        int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
469        this.wrap = (long) wrapMultiplier * width;
470        this.numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY,
471          (long) WIDTH_DEFAULT * WRAP_DEFAULT);
472        if (this.numNodes < this.wrap) {
473          this.wrap = this.numNodes;
474        }
475        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
476        this.numWalkers =
477          context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT);
478        this.walkersStop = false;
479        this.conf = context.getConfiguration();
480
481        if (multipleUnevenColumnFamilies) {
482          int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256);
483          int limit =
484            context.getConfiguration().getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
485              ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT);
486
487          Preconditions.checkArgument(n <= limit, "%s(%s) > %s(%s)", BIG_FAMILY_VALUE_SIZE_KEY, n,
488            ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit);
489
490          bigValue = new byte[n];
491          rand.nextBytes(bigValue);
492          LOG.info("Create a bigValue with " + n + " bytes.");
493        }
494
495        Preconditions.checkArgument(numNodes > 0, "numNodes(%s) <= 0", numNodes);
496        Preconditions.checkArgument(numNodes % width == 0, "numNodes(%s) mod width(%s) != 0",
497          numNodes, width);
498        Preconditions.checkArgument(numNodes % wrap == 0, "numNodes(%s) mod wrap(%s) != 0",
499          numNodes, wrap);
500      }
501
502      protected void instantiateHTable() throws IOException {
503        mutator = connection
504          .getBufferedMutator(new BufferedMutatorParams(getTableName(connection.getConfiguration()))
505            .writeBufferSize(4 * 1024 * 1024));
506      }
507
508      @Override
509      protected void cleanup(Context context) throws IOException, InterruptedException {
510        joinWalkers();
511        mutator.close();
512        connection.close();
513      }
514
515      @Override
516      protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
517        current[i] = new byte[key.getLength()];
518        System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
519        if (++i == current.length) {
520          LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=", current.length,
521            count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i);
522          persist(output, count, prev, current, id);
523          i = 0;
524
525          if (first == null) {
526            first = current;
527          }
528          prev = current;
529          current = new byte[this.width][];
530
531          count += current.length;
532          output.setStatus("Count " + count);
533
534          if (count % wrap == 0) {
535            // this block of code turns the 1 million linked list of length 25 into one giant
536            // circular linked list of 25 million
537            circularLeftShift(first);
538            persist(output, -1, prev, first, null);
539            // At this point the entire loop has been flushed so we can add one of its nodes to the
540            // concurrent walker
541            if (numWalkers > 0) {
542              addFlushed(key.getBytes());
543              if (walkers.isEmpty()) {
544                startWalkers(numWalkers, conf, output);
545              }
546            }
547            first = null;
548            prev = null;
549          }
550        }
551      }
552
553      private static <T> void circularLeftShift(T[] first) {
554        T ez = first[0];
555        System.arraycopy(first, 1, first, 0, first.length - 1);
556        first[first.length - 1] = ez;
557      }
558
559      private void addFlushed(byte[] rowKey) {
560        synchronized (flushedLoopsLock) {
561          flushedLoops.add(Bytes.toLong(rowKey));
562          flushedLoopsLock.notifyAll();
563        }
564      }
565
566      protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
567        throws IOException {
568        for (int i = 0; i < current.length; i++) {
569
570          if (i % 100 == 0) {
571            // Tickle progress every so often else maprunner will think us hung
572            output.progress();
573          }
574
575          Put put = new Put(current[i]);
576          put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
577
578          if (count >= 0) {
579            put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
580          }
581          if (id != null) {
582            put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
583          }
584          // See if we are to write multiple columns.
585          if (this.multipleUnevenColumnFamilies) {
586            // Use any column name.
587            put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
588            // Use any column name.
589            put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
590          }
591          mutator.mutate(put);
592        }
593
594        mutator.flush();
595      }
596
597      private void startWalkers(int numWalkers, Configuration conf, Context context) {
598        LOG.info("Starting " + numWalkers + " concurrent walkers");
599        for (int i = 0; i < numWalkers; i++) {
600          Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context));
601          walker.start();
602          walkers.add(walker);
603        }
604      }
605
606      private void joinWalkers() {
607        synchronized (flushedLoopsLock) {
608          walkersStop = true;
609          flushedLoopsLock.notifyAll();
610        }
611        for (Thread walker : walkers) {
612          Uninterruptibles.joinUninterruptibly(walker);
613        }
614      }
615
616      /**
617       * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by
618       * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are
619       * configured to only log erroneous nodes.
620       */
621
622      public class ContinuousConcurrentWalker implements Runnable {
623
624        ConcurrentWalker walker;
625        Configuration conf;
626        Context context;
627
628        public ContinuousConcurrentWalker(Configuration conf, Context context) {
629          this.conf = conf;
630          this.context = context;
631        }
632
633        @Override
634        public void run() {
635          while (!walkersStop) {
636            try {
637              long node = selectLoop();
638              try {
639                walkLoop(node);
640              } catch (IOException e) {
641                context.getCounter(GeneratorCounts.IOEXCEPTION).increment(1l);
642                return;
643              }
644            } catch (InterruptedException e) {
645              return;
646            }
647          }
648        }
649
650        private void walkLoop(long node) throws IOException {
651          walker = new ConcurrentWalker(context);
652          walker.setConf(conf);
653          walker.run(node, wrap);
654        }
655
656        private long selectLoop() throws InterruptedException {
657          synchronized (flushedLoopsLock) {
658            while (flushedLoops.isEmpty() && !walkersStop) {
659              flushedLoopsLock.wait();
660            }
661            if (walkersStop) {
662              throw new InterruptedException();
663            }
664            return flushedLoops.get(ThreadLocalRandom.current().nextInt(flushedLoops.size()));
665          }
666        }
667      }
668
669      public static class ConcurrentWalker extends WalkerBase {
670
671        Context context;
672
673        public ConcurrentWalker(Context context) {
674          this.context = context;
675        }
676
677        public void run(long startKeyIn, long maxQueriesIn) throws IOException {
678
679          long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE;
680          byte[] startKey = Bytes.toBytes(startKeyIn);
681
682          Connection connection = ConnectionFactory.createConnection(getConf());
683          Table table = connection.getTable(getTableName(getConf()));
684          long numQueries = 0;
685          // If isSpecificStart is set, only walk one list from that particular node.
686          // Note that in case of circular (or P-shaped) list it will walk forever, as is
687          // the case in normal run without startKey.
688
689          CINode node = findStartNode(table, startKey);
690          if (node == null) {
691            LOG.error("Start node not found: " + Bytes.toStringBinary(startKey));
692            throw new IOException("Start node not found: " + startKeyIn);
693          }
694          while (numQueries < maxQueries) {
695            numQueries++;
696            byte[] prev = node.prev;
697            node = getNode(prev, table, node);
698            if (node == null) {
699              LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev));
700              context.getCounter(GeneratorCounts.UNDEFINED).increment(1l);
701            } else if (node.prev.length == NO_KEY.length) {
702              LOG.error(
703                "ConcurrentWalker found TERMINATING NODE: " + Bytes.toStringBinary(node.key));
704              context.getCounter(GeneratorCounts.TERMINATING).increment(1l);
705            } else {
706              // Increment for successful walk
707              context.getCounter(GeneratorCounts.SUCCESS).increment(1l);
708            }
709          }
710          table.close();
711          connection.close();
712        }
713      }
714    }
715
716    @Override
717    public int run(String[] args) throws Exception {
718      if (args.length < 3) {
719        System.err.println(USAGE);
720        return 1;
721      }
722      try {
723        int numMappers = Integer.parseInt(args[0]);
724        long numNodes = Long.parseLong(args[1]);
725        Path tmpOutput = new Path(args[2]);
726        Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
727        Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
728        Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]);
729        return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
730      } catch (NumberFormatException e) {
731        System.err.println("Parsing generator arguments failed: " + e.getMessage());
732        System.err.println(USAGE);
733        return 1;
734      }
735    }
736
737    protected void createSchema() throws IOException {
738      Configuration conf = getConf();
739      TableName tableName = getTableName(conf);
740      try (Connection conn = ConnectionFactory.createConnection(conf);
741        Admin admin = conn.getAdmin()) {
742        if (!admin.tableExists(tableName)) {
743          TableDescriptor tableDescriptor = TableDescriptorBuilder
744            .newBuilder(getTableName(getConf()))
745            // if -DuseMob=true force all data through mob path.
746            .setColumnFamily(
747              setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME)).build())
748            // Always add these families. Just skip writing to them when we do not test per CF
749            // flush.
750            .setColumnFamily(
751              setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(BIG_FAMILY_NAME))
752                .build())
753            .setColumnFamily(
754              setMobProperties(conf, ColumnFamilyDescriptorBuilder.newBuilder(TINY_FAMILY_NAME))
755                .build())
756            .build();
757
758          // If we want to pre-split compute how many splits.
759          if (conf.getBoolean(PRESPLIT_TEST_TABLE_KEY, PRESPLIT_TEST_TABLE)) {
760            int numberOfServers = admin.getRegionServers().size();
761            if (numberOfServers == 0) {
762              throw new IllegalStateException("No live regionservers");
763            }
764            int regionsPerServer = conf.getInt(REGIONS_PER_SERVER_KEY, DEFAULT_REGIONS_PER_SERVER);
765            int totalNumberOfRegions = numberOfServers * regionsPerServer;
766            LOG.info("Number of live regionservers: " + numberOfServers + ", "
767              + "pre-splitting table into " + totalNumberOfRegions + " regions "
768              + "(default regions per server: " + regionsPerServer + ")");
769
770            byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
771
772            admin.createTable(tableDescriptor, splits);
773          } else {
774            // Looks like we're just letting things play out.
775            // Create a table with on region by default.
776            // This will make the splitting work hard.
777            admin.createTable(tableDescriptor);
778          }
779        }
780      } catch (MasterNotRunningException e) {
781        LOG.error("Master not running", e);
782        throw new IOException(e);
783      }
784    }
785
786    public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width,
787      Integer wrapMultiplier, Integer numWalkers) throws Exception {
788      LOG.info(
789        "Running RandomInputGenerator with numMappers=" + numMappers + ", numNodes=" + numNodes);
790      Job job = Job.getInstance(getConf());
791
792      job.setJobName("Random Input Generator");
793      job.setNumReduceTasks(0);
794      job.setJarByClass(getClass());
795
796      job.setInputFormatClass(GeneratorInputFormat.class);
797      job.setOutputKeyClass(BytesWritable.class);
798      job.setOutputValueClass(NullWritable.class);
799
800      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
801
802      job.setMapperClass(Mapper.class); // identity mapper
803
804      FileOutputFormat.setOutputPath(job, tmpOutput);
805      job.setOutputFormatClass(SequenceFileOutputFormat.class);
806      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class);
807
808      boolean success = jobCompletion(job);
809
810      return success ? 0 : 1;
811    }
812
813    public int runGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width,
814      Integer wrapMultiplier, Integer numWalkers) throws Exception {
815      LOG.info("Running Generator with numMappers=" + numMappers + ", numNodes=" + numNodes);
816      createSchema();
817      job = Job.getInstance(getConf());
818
819      job.setJobName("Link Generator");
820      job.setNumReduceTasks(0);
821      job.setJarByClass(getClass());
822
823      FileInputFormat.setInputPaths(job, tmpOutput);
824      job.setInputFormatClass(OneFilePerMapperSFIF.class);
825      job.setOutputKeyClass(NullWritable.class);
826      job.setOutputValueClass(NullWritable.class);
827
828      setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers);
829
830      setMapperForGenerator(job);
831
832      job.setOutputFormatClass(NullOutputFormat.class);
833
834      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
835      TableMapReduceUtil.addDependencyJars(job);
836      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
837        AbstractHBaseTool.class);
838      TableMapReduceUtil.initCredentials(job);
839
840      boolean success = jobCompletion(job);
841
842      return success ? 0 : 1;
843    }
844
845    protected boolean jobCompletion(Job job)
846      throws IOException, InterruptedException, ClassNotFoundException {
847      boolean success = job.waitForCompletion(true);
848      return success;
849    }
850
851    protected void setMapperForGenerator(Job job) {
852      job.setMapperClass(GeneratorMapper.class);
853    }
854
855    public int run(int numMappers, long numNodes, Path tmpOutput, Integer width,
856      Integer wrapMultiplier, Integer numWalkers) throws Exception {
857      int ret =
858        runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
859      if (ret > 0) {
860        return ret;
861      }
862      return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers);
863    }
864
865    public boolean verify() {
866      try {
867        Counters counters = job.getCounters();
868        if (counters == null) {
869          LOG.info("Counters object was null, Generator verification cannot be performed."
870            + " This is commonly a result of insufficient YARN configuration.");
871          return false;
872        }
873
874        if (
875          counters.findCounter(GeneratorCounts.TERMINATING).getValue() > 0
876            || counters.findCounter(GeneratorCounts.UNDEFINED).getValue() > 0
877            || counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue() > 0
878        ) {
879          LOG.error("Concurrent walker failed to verify during Generation phase");
880          LOG.error(
881            "TERMINATING nodes: " + counters.findCounter(GeneratorCounts.TERMINATING).getValue());
882          LOG.error(
883            "UNDEFINED nodes: " + counters.findCounter(GeneratorCounts.UNDEFINED).getValue());
884          LOG.error(
885            "IOEXCEPTION nodes: " + counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue());
886          return false;
887        }
888      } catch (IOException e) {
889        LOG.info("Generator verification could not find counter");
890        return false;
891      }
892      return true;
893    }
894  }
895
896  private static ColumnFamilyDescriptorBuilder setMobProperties(final Configuration conf,
897    final ColumnFamilyDescriptorBuilder builder) {
898    if (conf.getBoolean("useMob", false)) {
899      builder.setMobEnabled(true);
900      builder.setMobThreshold(4);
901    }
902    return builder;
903  }
904
905  /**
906   * Tool to search missing rows in WALs and hfiles. Pass in file or dir of keys to search for. Key
907   * file must have been written by Verify step (we depend on the format it writes out. We'll read
908   * them in and then search in hbase WALs and oldWALs dirs (Some of this is TODO).
909   */
910  static class Search extends Configured implements Tool {
911    private static final Logger LOG = LoggerFactory.getLogger(Search.class);
912    protected Job job;
913
914    private static void printUsage(final String error) {
915      if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
916      System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
917    }
918
919    @Override
920    public int run(String[] args) throws Exception {
921      if (args.length < 1 || args.length > 2) {
922        printUsage(null);
923        return 1;
924      }
925      Path inputDir = new Path(args[0]);
926      int numMappers = 1;
927      if (args.length > 1) {
928        numMappers = Integer.parseInt(args[1]);
929      }
930      return run(inputDir, numMappers);
931    }
932
933    /**
934     * WALPlayer override that searches for keys loaded in the setup.
935     */
936    public static class WALSearcher extends WALPlayer {
937      public WALSearcher(Configuration conf) {
938        super(conf);
939      }
940
941      /**
942       * The actual searcher mapper.
943       */
944      public static class WALMapperSearcher extends WALMapper {
945        private SortedSet<byte[]> keysToFind;
946        private AtomicInteger rows = new AtomicInteger(0);
947
948        @Override
949        public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
950          throws IOException {
951          super.setup(context);
952          try {
953            this.keysToFind = readKeysToSearch(context.getConfiguration());
954            LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
955          } catch (InterruptedException e) {
956            throw new InterruptedIOException(e.toString());
957          }
958        }
959
960        @Override
961        protected boolean filter(Context context, Cell cell) {
962          // TODO: Can I do a better compare than this copying out key?
963          byte[] row = new byte[cell.getRowLength()];
964          System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
965          boolean b = this.keysToFind.contains(row);
966          if (b) {
967            String keyStr = Bytes.toStringBinary(row);
968            try {
969              LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
970            } catch (IOException | InterruptedException e) {
971              LOG.warn(e.toString(), e);
972            }
973            if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
974              context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
975            }
976            context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
977          }
978          return b;
979        }
980      }
981
982      // Put in place the above WALMapperSearcher.
983      @Override
984      public Job createSubmittableJob(String[] args) throws IOException {
985        Job job = super.createSubmittableJob(args);
986        // Call my class instead.
987        job.setJarByClass(WALMapperSearcher.class);
988        job.setMapperClass(WALMapperSearcher.class);
989        job.setOutputFormatClass(NullOutputFormat.class);
990        return job;
991      }
992    }
993
994    static final String FOUND_GROUP_KEY = "Found";
995    static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
996
997    public int run(Path inputDir, int numMappers) throws Exception {
998      getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
999      SortedSet<byte[]> keys = readKeysToSearch(getConf());
1000      if (keys.isEmpty()) throw new RuntimeException("No keys to find");
1001      LOG.info("Count of keys to find: " + keys.size());
1002      for (byte[] key : keys)
1003        LOG.info("Key: " + Bytes.toStringBinary(key));
1004      // Now read all WALs. In two dirs. Presumes certain layout.
1005      Path walsDir =
1006        new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
1007      Path oldWalsDir =
1008        new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
1009      LOG.info("Running Search with keys inputDir=" + inputDir + ", numMappers=" + numMappers
1010        + " against " + getConf().get(HConstants.HBASE_DIR));
1011      int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()),
1012        new String[] { walsDir.toString(), "" });
1013      if (ret != 0) {
1014        return ret;
1015      }
1016      return ToolRunner.run(getConf(), new WALSearcher(getConf()),
1017        new String[] { oldWalsDir.toString(), "" });
1018    }
1019
1020    static SortedSet<byte[]> readKeysToSearch(final Configuration conf)
1021      throws IOException, InterruptedException {
1022      Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
1023      FileSystem fs = FileSystem.get(conf);
1024      SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1025      if (!fs.exists(keysInputDir)) {
1026        throw new FileNotFoundException(keysInputDir.toString());
1027      }
1028      if (!fs.isDirectory(keysInputDir)) {
1029        throw new UnsupportedOperationException("TODO");
1030      } else {
1031        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
1032        while (iterator.hasNext()) {
1033          LocatedFileStatus keyFileStatus = iterator.next();
1034          // Skip "_SUCCESS" file.
1035          if (keyFileStatus.getPath().getName().startsWith("_")) continue;
1036          result.addAll(readFileToSearch(conf, keyFileStatus));
1037        }
1038      }
1039      return result;
1040    }
1041
1042    private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
1043      final LocatedFileStatus keyFileStatus) throws IOException, InterruptedException {
1044      SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1045      // Return entries that are flagged VerifyCounts.UNDEFINED in the value. Return the row. This
1046      // is
1047      // what is missing.
1048      TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
1049      try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
1050        new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
1051        InputSplit is =
1052          new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String[] {});
1053        rr.initialize(is, context);
1054        while (rr.nextKeyValue()) {
1055          rr.getCurrentKey();
1056          BytesWritable bw = rr.getCurrentValue();
1057          if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.VerifyCounts.UNDEFINED) {
1058            byte[] key = new byte[rr.getCurrentKey().getLength()];
1059            System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0,
1060              rr.getCurrentKey().getLength());
1061            result.add(key);
1062          }
1063        }
1064      }
1065      return result;
1066    }
1067  }
1068
1069  /**
1070   * A Map Reduce job that verifies that the linked lists generated by {@link Generator} do not have
1071   * any holes.
1072   */
1073  static class Verify extends Configured implements Tool {
1074    private static final Logger LOG = LoggerFactory.getLogger(Verify.class);
1075    protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
1076    protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
1077    protected Job job;
1078
1079    public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
1080      private BytesWritable row = new BytesWritable();
1081      private BytesWritable ref = new BytesWritable();
1082      private boolean multipleUnevenColumnFamilies;
1083
1084      @Override
1085      protected void
1086        setup(Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
1087          throws IOException, InterruptedException {
1088        this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
1089      }
1090
1091      @Override
1092      protected void map(ImmutableBytesWritable key, Result value, Context context)
1093        throws IOException, InterruptedException {
1094        byte[] rowKey = key.get();
1095        row.set(rowKey, 0, rowKey.length);
1096        if (
1097          multipleUnevenColumnFamilies && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME)
1098            || !value.containsColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME))
1099        ) {
1100          context.write(row, DEF_LOST_FAMILIES);
1101        } else {
1102          context.write(row, DEF);
1103        }
1104        byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
1105        if (prev != null && prev.length > 0) {
1106          ref.set(prev, 0, prev.length);
1107          context.write(ref, row);
1108        } else {
1109          LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
1110        }
1111      }
1112    }
1113
1114    /**
1115     * Don't change the order of these enums. Their ordinals are used as type flag when we emit
1116     * problems found from the reducer.
1117     */
1118    public static enum VerifyCounts {
1119      UNREFERENCED,
1120      UNDEFINED,
1121      REFERENCED,
1122      CORRUPT,
1123      EXTRAREFERENCES,
1124      EXTRA_UNDEF_REFERENCES,
1125      LOST_FAMILIES
1126    }
1127
1128    /**
1129     * Per reducer, we output problem rows as byte arrays so can be used as input for subsequent
1130     * investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag saying what
1131     * sort of emission it is. Flag is the Count enum ordinal as a short.
1132     */
1133    public static class VerifyReducer
1134      extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
1135      private ArrayList<byte[]> refs = new ArrayList<>();
1136      private final BytesWritable UNREF =
1137        new BytesWritable(addPrefixFlag(VerifyCounts.UNREFERENCED.ordinal(), new byte[] {}));
1138      private final BytesWritable LOSTFAM =
1139        new BytesWritable(addPrefixFlag(VerifyCounts.LOST_FAMILIES.ordinal(), new byte[] {}));
1140
1141      private AtomicInteger rows = new AtomicInteger(0);
1142      private Connection connection;
1143
1144      @Override
1145      protected void
1146        setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1147          throws IOException, InterruptedException {
1148        super.setup(context);
1149        this.connection = ConnectionFactory.createConnection(context.getConfiguration());
1150      }
1151
1152      @Override
1153      protected void
1154        cleanup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
1155          throws IOException, InterruptedException {
1156        if (this.connection != null) {
1157          this.connection.close();
1158        }
1159        super.cleanup(context);
1160      }
1161
1162      /**
1163       * Returns new byte array that has <code>ordinal</code> as prefix on front taking up
1164       * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
1165       */
1166      public static byte[] addPrefixFlag(final int ordinal, final byte[] r) {
1167        byte[] prefix = Bytes.toBytes((short) ordinal);
1168        if (prefix.length != Bytes.SIZEOF_SHORT) {
1169          throw new RuntimeException("Unexpected size: " + prefix.length);
1170        }
1171        byte[] result = new byte[prefix.length + r.length];
1172        System.arraycopy(prefix, 0, result, 0, prefix.length);
1173        System.arraycopy(r, 0, result, prefix.length, r.length);
1174        return result;
1175      }
1176
1177      /**
1178       * Returns type from the Counts enum of this row. Reads prefix added by
1179       * {@link #addPrefixFlag(int, byte[])}
1180       */
1181      public static VerifyCounts whichType(final byte[] bs) {
1182        int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
1183        return VerifyCounts.values()[ordinal];
1184      }
1185
1186      /** Returns Row bytes minus the type flag. */
1187      public static byte[] getRowOnly(BytesWritable bw) {
1188        byte[] bytes = new byte[bw.getLength() - Bytes.SIZEOF_SHORT];
1189        System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
1190        return bytes;
1191      }
1192
1193      @Override
1194      public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
1195        throws IOException, InterruptedException {
1196        int defCount = 0;
1197        boolean lostFamilies = false;
1198        refs.clear();
1199        for (BytesWritable type : values) {
1200          if (type.getLength() == DEF.getLength()) {
1201            defCount++;
1202            if (type.getBytes()[0] == 1) {
1203              lostFamilies = true;
1204            }
1205          } else {
1206            byte[] bytes = new byte[type.getLength()];
1207            System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
1208            refs.add(bytes);
1209          }
1210        }
1211
1212        // TODO check for more than one def, should not happen
1213        StringBuilder refsSb = null;
1214        if (defCount == 0 || refs.size() != 1) {
1215          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1216          refsSb = dumpExtraInfoOnRefs(key, context, refs);
1217          LOG.error("LinkedListError: key=" + keyString + ", reference(s)="
1218            + (refsSb != null ? refsSb.toString() : ""));
1219        }
1220        if (lostFamilies) {
1221          String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1222          LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
1223          context.getCounter(VerifyCounts.LOST_FAMILIES).increment(1);
1224          context.write(key, LOSTFAM);
1225        }
1226
1227        if (defCount == 0 && refs.size() > 0) {
1228          // This is bad, found a node that is referenced but not defined. It must have been
1229          // lost, emit some info about this node for debugging purposes.
1230          // Write out a line per reference. If more than one, flag it.;
1231          for (int i = 0; i < refs.size(); i++) {
1232            byte[] bs = refs.get(i);
1233            int ordinal;
1234            if (i <= 0) {
1235              ordinal = VerifyCounts.UNDEFINED.ordinal();
1236              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1237              context.getCounter(VerifyCounts.UNDEFINED).increment(1);
1238            } else {
1239              ordinal = VerifyCounts.EXTRA_UNDEF_REFERENCES.ordinal();
1240              context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
1241            }
1242          }
1243          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1244            // Print out missing row; doing get on reference gives info on when the referencer
1245            // was added which can help a little debugging. This info is only available in mapper
1246            // output -- the 'Linked List error Key...' log message above. What we emit here is
1247            // useless for debugging.
1248            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1249            context.getCounter("undef", keyString).increment(1);
1250          }
1251        } else if (defCount > 0 && refs.isEmpty()) {
1252          // node is defined but not referenced
1253          context.write(key, UNREF);
1254          context.getCounter(VerifyCounts.UNREFERENCED).increment(1);
1255          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
1256            String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
1257            context.getCounter("unref", keyString).increment(1);
1258          }
1259        } else {
1260          if (refs.size() > 1) {
1261            // Skip first reference.
1262            for (int i = 1; i < refs.size(); i++) {
1263              context.write(key, new BytesWritable(
1264                addPrefixFlag(VerifyCounts.EXTRAREFERENCES.ordinal(), refs.get(i))));
1265            }
1266            context.getCounter(VerifyCounts.EXTRAREFERENCES).increment(refs.size() - 1);
1267          }
1268          // node is defined and referenced
1269          context.getCounter(VerifyCounts.REFERENCED).increment(1);
1270        }
1271      }
1272
1273      /**
1274       * Dump out extra info around references if there are any. Helps debugging.
1275       * @return StringBuilder filled with references if any.
1276       */
1277      @SuppressWarnings("JavaUtilDate")
1278      private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1279        final List<byte[]> refs) throws IOException {
1280        StringBuilder refsSb = null;
1281        if (refs.isEmpty()) return refsSb;
1282        refsSb = new StringBuilder();
1283        String comma = "";
1284        // If a row is a reference but has no define, print the content of the row that has
1285        // this row as a 'prev'; it will help debug. The missing row was written just before
1286        // the row we are dumping out here.
1287        TableName tn = getTableName(context.getConfiguration());
1288        try (Table t = this.connection.getTable(tn)) {
1289          for (byte[] ref : refs) {
1290            Result r = t.get(new Get(ref));
1291            List<Cell> cells = r.listCells();
1292            String ts = (cells != null && !cells.isEmpty())
1293              ? new java.util.Date(cells.get(0).getTimestamp()).toString()
1294              : "";
1295            byte[] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1296            String jobStr = (b != null && b.length > 0) ? Bytes.toString(b) : "";
1297            b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1298            long count = (b != null && b.length > 0) ? Bytes.toLong(b) : -1;
1299            b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1300            String refRegionLocation = "";
1301            String keyRegionLocation = "";
1302            if (b != null && b.length > 0) {
1303              try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1304                HRegionLocation hrl = rl.getRegionLocation(b);
1305                if (hrl != null) refRegionLocation = hrl.toString();
1306                // Key here probably has trailing zeros on it.
1307                hrl = rl.getRegionLocation(key.getBytes());
1308                if (hrl != null) keyRegionLocation = hrl.toString();
1309              }
1310            }
1311            LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref)
1312              + ", refPrevEqualsKey="
1313              + (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0)
1314              + ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength())
1315              + ", ref row date=" + ts + ", jobStr=" + jobStr + ", ref row count=" + count
1316              + ", ref row regionLocation=" + refRegionLocation + ", key row regionLocation="
1317              + keyRegionLocation);
1318            refsSb.append(comma);
1319            comma = ",";
1320            refsSb.append(Bytes.toStringBinary(ref));
1321          }
1322        }
1323        return refsSb;
1324      }
1325    }
1326
1327    @Override
1328    public int run(String[] args) throws Exception {
1329      if (args.length != 2) {
1330        System.out
1331          .println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>");
1332        return 0;
1333      }
1334
1335      String outputDir = args[0];
1336      int numReducers = Integer.parseInt(args[1]);
1337
1338      return run(outputDir, numReducers);
1339    }
1340
1341    public int run(String outputDir, int numReducers) throws Exception {
1342      return run(new Path(outputDir), numReducers);
1343    }
1344
1345    public int run(Path outputDir, int numReducers) throws Exception {
1346      LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
1347
1348      job = Job.getInstance(getConf());
1349
1350      job.setJobName("Link Verifier");
1351      job.setNumReduceTasks(numReducers);
1352      job.setJarByClass(getClass());
1353
1354      setJobScannerConf(job);
1355
1356      Scan scan = new Scan();
1357      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1358      scan.setCaching(10000);
1359      scan.setCacheBlocks(false);
1360      if (isMultiUnevenColumnFamilies(getConf())) {
1361        scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1362        scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1363      }
1364
1365      TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1366        VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1367      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
1368        AbstractHBaseTool.class);
1369
1370      job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1371
1372      job.setReducerClass(VerifyReducer.class);
1373      job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1374      job.setOutputKeyClass(BytesWritable.class);
1375      job.setOutputValueClass(BytesWritable.class);
1376      TextOutputFormat.setOutputPath(job, outputDir);
1377
1378      boolean success = job.waitForCompletion(true);
1379
1380      if (success) {
1381        Counters counters = job.getCounters();
1382        if (null == counters) {
1383          LOG.warn("Counters were null, cannot verify Job completion."
1384            + " This is commonly a result of insufficient YARN configuration.");
1385          // We don't have access to the counters to know if we have "bad" counts
1386          return 0;
1387        }
1388
1389        // If we find no unexpected values, the job didn't outright fail
1390        if (verifyUnexpectedValues(counters)) {
1391          // We didn't check referenced+unreferenced counts, leave that to visual inspection
1392          return 0;
1393        }
1394      }
1395
1396      // We failed
1397      return 1;
1398    }
1399
1400    public boolean verify(long expectedReferenced) throws Exception {
1401      if (job == null) {
1402        throw new IllegalStateException("You should call run() first");
1403      }
1404
1405      Counters counters = job.getCounters();
1406      if (counters == null) {
1407        LOG.info("Counters object was null, write verification cannot be performed."
1408          + " This is commonly a result of insufficient YARN configuration.");
1409        return false;
1410      }
1411
1412      // Run through each check, even if we fail one early
1413      boolean success = verifyExpectedValues(expectedReferenced, counters);
1414
1415      if (!verifyUnexpectedValues(counters)) {
1416        // We found counter objects which imply failure
1417        success = false;
1418      }
1419
1420      if (!success) {
1421        handleFailure(counters);
1422      }
1423      return success;
1424    }
1425
1426    /**
1427     * Verify the values in the Counters against the expected number of entries written. Expected
1428     * number of referenced entrires The Job's Counters object
1429     * @return True if the values match what's expected, false otherwise
1430     */
1431    protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1432      final Counter referenced = counters.findCounter(VerifyCounts.REFERENCED);
1433      final Counter unreferenced = counters.findCounter(VerifyCounts.UNREFERENCED);
1434      boolean success = true;
1435
1436      if (expectedReferenced != referenced.getValue()) {
1437        LOG.error("Expected referenced count does not match with actual referenced count. "
1438          + "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1439        success = false;
1440      }
1441
1442      if (unreferenced.getValue() > 0) {
1443        final Counter multiref = counters.findCounter(VerifyCounts.EXTRAREFERENCES);
1444        boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1445        LOG.error(
1446          "Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1447            + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1448        success = false;
1449      }
1450
1451      return success;
1452    }
1453
1454    /**
1455     * Verify that the Counters don't contain values which indicate an outright failure from the
1456     * Reducers. The Job's counters
1457     * @return True if the "bad" counter objects are 0, false otherwise
1458     */
1459    protected boolean verifyUnexpectedValues(Counters counters) {
1460      final Counter undefined = counters.findCounter(VerifyCounts.UNDEFINED);
1461      final Counter lostfamilies = counters.findCounter(VerifyCounts.LOST_FAMILIES);
1462      boolean success = true;
1463
1464      if (undefined.getValue() > 0) {
1465        LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1466        success = false;
1467      }
1468
1469      if (lostfamilies.getValue() > 0) {
1470        LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1471        success = false;
1472      }
1473
1474      return success;
1475    }
1476
1477    protected void handleFailure(Counters counters) throws IOException {
1478      Configuration conf = job.getConfiguration();
1479      TableName tableName = getTableName(conf);
1480      try (Connection conn = ConnectionFactory.createConnection(conf)) {
1481        try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1482          CounterGroup g = counters.getGroup("undef");
1483          Iterator<Counter> it = g.iterator();
1484          while (it.hasNext()) {
1485            String keyString = it.next().getName();
1486            byte[] key = Bytes.toBytes(keyString);
1487            HRegionLocation loc = rl.getRegionLocation(key, true);
1488            LOG.error("undefined row " + keyString + ", " + loc);
1489          }
1490          g = counters.getGroup("unref");
1491          it = g.iterator();
1492          while (it.hasNext()) {
1493            String keyString = it.next().getName();
1494            byte[] key = Bytes.toBytes(keyString);
1495            HRegionLocation loc = rl.getRegionLocation(key, true);
1496            LOG.error("unreferred row " + keyString + ", " + loc);
1497          }
1498        }
1499      }
1500    }
1501  }
1502
1503  /**
1504   * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
1505   * adds more data.
1506   */
1507  static class Loop extends Configured implements Tool {
1508
1509    private static final Logger LOG = LoggerFactory.getLogger(Loop.class);
1510    private static final String USAGE = "Usage: Loop <num iterations> <num mappers> "
1511      + "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>"
1512      + " <num walker threads>] \n"
1513      + "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n"
1514      + "walkers will select and verify random flushed loop during Generation.";
1515
1516    IntegrationTestBigLinkedList it;
1517
1518    protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
1519      Integer wrapMultiplier, Integer numWalkers) throws Exception {
1520      Path outputPath = new Path(outputDir);
1521      UUID uuid = UUID.randomUUID(); // create a random UUID.
1522      Path generatorOutput = new Path(outputPath, uuid.toString());
1523
1524      Generator generator = new Generator();
1525      generator.setConf(getConf());
1526      int retCode =
1527        generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers);
1528      if (retCode > 0) {
1529        throw new RuntimeException("Generator failed with return code: " + retCode);
1530      }
1531      if (numWalkers > 0) {
1532        if (!generator.verify()) {
1533          throw new RuntimeException("Generator.verify failed");
1534        }
1535      }
1536      LOG.info("Generator finished with success. Total nodes=" + numNodes);
1537    }
1538
1539    protected void runVerify(String outputDir, int numReducers, long expectedNumNodes)
1540      throws Exception {
1541      Path outputPath = new Path(outputDir);
1542      UUID uuid = UUID.randomUUID(); // create a random UUID.
1543      Path iterationOutput = new Path(outputPath, uuid.toString());
1544
1545      Verify verify = new Verify();
1546      verify.setConf(getConf());
1547      int retCode = verify.run(iterationOutput, numReducers);
1548      if (retCode > 0) {
1549        throw new RuntimeException("Verify.run failed with return code: " + retCode);
1550      }
1551
1552      if (!verify.verify(expectedNumNodes)) {
1553        throw new RuntimeException("Verify.verify failed");
1554      }
1555      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
1556    }
1557
1558    @Override
1559    public int run(String[] args) throws Exception {
1560      if (args.length < 5) {
1561        System.err.println(USAGE);
1562        return 1;
1563      }
1564      try {
1565        int numIterations = Integer.parseInt(args[0]);
1566        int numMappers = Integer.parseInt(args[1]);
1567        long numNodes = Long.parseLong(args[2]);
1568        String outputDir = args[3];
1569        int numReducers = Integer.parseInt(args[4]);
1570        Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1571        Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1572        Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]);
1573
1574        long expectedNumNodes = 0;
1575
1576        if (numIterations < 0) {
1577          numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
1578        }
1579        LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1580        for (int i = 0; i < numIterations; i++) {
1581          LOG.info("Starting iteration = " + i);
1582          runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers);
1583          expectedNumNodes += numMappers * numNodes;
1584          runVerify(outputDir, numReducers, expectedNumNodes);
1585        }
1586        return 0;
1587      } catch (NumberFormatException e) {
1588        System.err.println("Parsing loop arguments failed: " + e.getMessage());
1589        System.err.println(USAGE);
1590        return 1;
1591      }
1592    }
1593  }
1594
1595  /**
1596   * A stand alone program that prints out portions of a list created by {@link Generator}
1597   */
1598  private static class Print extends Configured implements Tool {
1599    @Override
1600    public int run(String[] args) throws Exception {
1601      Options options = new Options();
1602      options.addOption("s", "start", true, "start key");
1603      options.addOption("e", "end", true, "end key");
1604      options.addOption("l", "limit", true, "number to print");
1605
1606      GnuParser parser = new GnuParser();
1607      CommandLine cmd = null;
1608      try {
1609        cmd = parser.parse(options, args);
1610        if (cmd.getArgs().length != 0) {
1611          throw new ParseException("Command takes no arguments");
1612        }
1613      } catch (ParseException e) {
1614        System.err.println("Failed to parse command line " + e.getMessage());
1615        System.err.println();
1616        HelpFormatter formatter = new HelpFormatter();
1617        formatter.printHelp(getClass().getSimpleName(), options);
1618        System.exit(-1);
1619      }
1620
1621      Connection connection = ConnectionFactory.createConnection(getConf());
1622      Table table = connection.getTable(getTableName(getConf()));
1623
1624      Scan scan = new Scan();
1625      scan.setBatch(10000);
1626
1627      if (cmd.hasOption("s")) scan.withStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1628
1629      if (cmd.hasOption("e")) {
1630        scan.withStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1631      }
1632
1633      int limit = 0;
1634      if (cmd.hasOption("l")) limit = Integer.parseInt(cmd.getOptionValue("l"));
1635      else limit = 100;
1636
1637      ResultScanner scanner = table.getScanner(scan);
1638
1639      CINode node = new CINode();
1640      Result result = scanner.next();
1641      int count = 0;
1642      while (result != null && count++ < limit) {
1643        node = getCINode(result, node);
1644        System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1645          Bytes.toStringBinary(node.prev), node.count, node.client);
1646        result = scanner.next();
1647      }
1648      scanner.close();
1649      table.close();
1650      connection.close();
1651
1652      return 0;
1653    }
1654  }
1655
1656  /**
1657   * A stand alone program that deletes a single node.
1658   */
1659  private static class Delete extends Configured implements Tool {
1660    @Override
1661    public int run(String[] args) throws Exception {
1662      if (args.length != 1) {
1663        System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1664        return 0;
1665      }
1666      byte[] val = Bytes.toBytesBinary(args[0]);
1667
1668      org.apache.hadoop.hbase.client.Delete delete = new org.apache.hadoop.hbase.client.Delete(val);
1669
1670      try (Connection connection = ConnectionFactory.createConnection(getConf());
1671        Table table = connection.getTable(getTableName(getConf()))) {
1672        table.delete(delete);
1673      }
1674
1675      System.out.println("Delete successful");
1676      return 0;
1677    }
1678  }
1679
1680  abstract static class WalkerBase extends Configured {
1681    protected static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1682      Scan scan = new Scan();
1683      scan.withStartRow(startKey);
1684      scan.setBatch(1);
1685      scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1686
1687      long t1 = EnvironmentEdgeManager.currentTime();
1688      ResultScanner scanner = table.getScanner(scan);
1689      Result result = scanner.next();
1690      long t2 = EnvironmentEdgeManager.currentTime();
1691      scanner.close();
1692
1693      if (result != null) {
1694        CINode node = getCINode(result, new CINode());
1695        System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1696        return node;
1697      }
1698
1699      System.out.println("FSR " + (t2 - t1));
1700
1701      return null;
1702    }
1703
1704    protected CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1705      Get get = new Get(row);
1706      get.addColumn(FAMILY_NAME, COLUMN_PREV);
1707      Result result = table.get(get);
1708      return getCINode(result, node);
1709    }
1710  }
1711
1712  /**
1713   * A stand alone program that follows a linked list created by {@link Generator} and prints timing
1714   * info.
1715   */
1716  private static class Walker extends WalkerBase implements Tool {
1717
1718    public Walker() {
1719    }
1720
1721    @Override
1722    public int run(String[] args) throws IOException {
1723
1724      Options options = new Options();
1725      options.addOption("n", "num", true, "number of queries");
1726      options.addOption("s", "start", true, "key to start at, binary string");
1727      options.addOption("l", "logevery", true, "log every N queries");
1728
1729      GnuParser parser = new GnuParser();
1730      CommandLine cmd = null;
1731      try {
1732        cmd = parser.parse(options, args);
1733        if (cmd.getArgs().length != 0) {
1734          throw new ParseException("Command takes no arguments");
1735        }
1736      } catch (ParseException e) {
1737        System.err.println("Failed to parse command line " + e.getMessage());
1738        System.err.println();
1739        HelpFormatter formatter = new HelpFormatter();
1740        formatter.printHelp(getClass().getSimpleName(), options);
1741        System.exit(-1);
1742      }
1743
1744      long maxQueries = Long.MAX_VALUE;
1745      if (cmd.hasOption('n')) {
1746        maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1747      }
1748      boolean isSpecificStart = cmd.hasOption('s');
1749
1750      byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1751      int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1752
1753      Connection connection = ConnectionFactory.createConnection(getConf());
1754      Table table = connection.getTable(getTableName(getConf()));
1755      long numQueries = 0;
1756      // If isSpecificStart is set, only walk one list from that particular node.
1757      // Note that in case of circular (or P-shaped) list it will walk forever, as is
1758      // the case in normal run without startKey.
1759      while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1760        if (!isSpecificStart) {
1761          startKey = new byte[ROWKEY_LENGTH];
1762          Bytes.random(startKey);
1763        }
1764        CINode node = findStartNode(table, startKey);
1765        if (node == null && isSpecificStart) {
1766          System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1767        }
1768        numQueries++;
1769        while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
1770          byte[] prev = node.prev;
1771          long t1 = EnvironmentEdgeManager.currentTime();
1772          node = getNode(prev, table, node);
1773          long t2 = EnvironmentEdgeManager.currentTime();
1774          if (logEvery > 0 && numQueries % logEvery == 0) {
1775            System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1776          }
1777          numQueries++;
1778          if (node == null) {
1779            System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1780          } else if (node.prev.length == NO_KEY.length) {
1781            System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1782          }
1783        }
1784      }
1785      table.close();
1786      connection.close();
1787      return 0;
1788    }
1789  }
1790
1791  private static class Clean extends Configured implements Tool {
1792    @Override
1793    public int run(String[] args) throws Exception {
1794      if (args.length < 1) {
1795        System.err.println("Usage: Clean <output dir>");
1796        return -1;
1797      }
1798
1799      Path p = new Path(args[0]);
1800      Configuration conf = getConf();
1801      TableName tableName = getTableName(conf);
1802      try (FileSystem fs = HFileSystem.get(conf);
1803        Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) {
1804        if (admin.tableExists(tableName)) {
1805          admin.disableTable(tableName);
1806          admin.deleteTable(tableName);
1807        }
1808
1809        if (fs.exists(p)) {
1810          fs.delete(p, true);
1811        }
1812      }
1813
1814      return 0;
1815    }
1816  }
1817
1818  static TableName getTableName(Configuration conf) {
1819    return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1820  }
1821
1822  private static CINode getCINode(Result result, CINode node) {
1823    node.key = Bytes.copy(result.getRow());
1824    if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1825      node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1826    } else {
1827      node.prev = NO_KEY;
1828    }
1829    if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1830      node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1831    } else {
1832      node.count = -1;
1833    }
1834    if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1835      node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1836    } else {
1837      node.client = "";
1838    }
1839    return node;
1840  }
1841
1842  @Override
1843  public void setUpCluster() throws Exception {
1844    util = getTestingUtil(getConf());
1845    boolean isDistributed = util.isDistributedCluster();
1846    util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1847    if (!isDistributed) {
1848      util.startMiniMapReduceCluster();
1849    }
1850    this.setConf(util.getConfiguration());
1851  }
1852
1853  @Override
1854  public void cleanUpCluster() throws Exception {
1855    super.cleanUpCluster();
1856    if (util.isDistributedCluster()) {
1857      util.shutdownMiniMapReduceCluster();
1858    }
1859  }
1860
1861  private static boolean isMultiUnevenColumnFamilies(Configuration conf) {
1862    return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, true);
1863  }
1864
1865  @Test
1866  public void testContinuousIngest() throws IOException, Exception {
1867    // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1868    Configuration conf = getTestingUtil(getConf()).getConfiguration();
1869    if (isMultiUnevenColumnFamilies(getConf())) {
1870      // make sure per CF flush is on
1871      conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
1872        FlushAllLargeStoresPolicy.class.getName());
1873    }
1874    int ret = ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1875      util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1876    assertEquals(0, ret);
1877  }
1878
1879  private void usage() {
1880    System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1881    printCommands();
1882  }
1883
1884  private void printCommands() {
1885    System.err.println("Commands:");
1886    System.err.println(" generator  Map only job that generates data.");
1887    System.err.println(" verify     A map reduce job that looks for holes. Check return code and");
1888    System.err.println("            look at the counts after running. See REFERENCED and");
1889    System.err.println("            UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1890    System.err.println("            with the Generator.");
1891    System.err.println(" walker     "
1892      + "Standalone program that starts following a linked list & emits timing info.");
1893    System.err.println(" print      Standalone program that prints nodes in the linked list.");
1894    System.err.println(" delete     Standalone program that deletes a single node.");
1895    System.err.println(" loop       Program to Loop through Generator and Verify steps");
1896    System.err.println(" clean      Program to clean all left over detritus.");
1897    System.err.println(" search     Search for missing keys.");
1898    System.err.println("");
1899    System.err.println("General options:");
1900    System.err.println(" -D" + TABLE_NAME_KEY + "=<tableName>");
1901    System.err.println(
1902      "    Run using the <tableName> as the tablename.  Defaults to " + DEFAULT_TABLE_NAME);
1903    System.err.println(" -D" + REGIONS_PER_SERVER_KEY + "=<# regions>");
1904    System.err.println("    Create table with presplit regions per server.  Defaults to "
1905      + DEFAULT_REGIONS_PER_SERVER);
1906
1907    System.err.println(" -DuseMob=<true|false>");
1908    System.err.println(
1909      "    Create table so that the mob read/write path is forced.  " + "Defaults to false");
1910
1911    System.err.flush();
1912  }
1913
1914  @Override
1915  protected void processOptions(CommandLine cmd) {
1916    super.processOptions(cmd);
1917    String[] args = cmd.getArgs();
1918    // get the class, run with the conf
1919    if (args.length < 1) {
1920      printUsage(this.getClass().getSimpleName() + " <general options> COMMAND [<COMMAND options>]",
1921        "General options:", "");
1922      printCommands();
1923      // Have to throw an exception here to stop the processing. Looks ugly but gets message across.
1924      throw new RuntimeException("Incorrect Number of args.");
1925    }
1926    toRun = args[0];
1927    otherArgs = Arrays.copyOfRange(args, 1, args.length);
1928  }
1929
1930  @Override
1931  public int runTestFromCommandLine() throws Exception {
1932    Tool tool = null;
1933    if (toRun.equalsIgnoreCase("Generator")) {
1934      tool = new Generator();
1935    } else if (toRun.equalsIgnoreCase("Verify")) {
1936      tool = new Verify();
1937    } else if (toRun.equalsIgnoreCase("Loop")) {
1938      Loop loop = new Loop();
1939      loop.it = this;
1940      tool = loop;
1941    } else if (toRun.equalsIgnoreCase("Walker")) {
1942      tool = new Walker();
1943    } else if (toRun.equalsIgnoreCase("Print")) {
1944      tool = new Print();
1945    } else if (toRun.equalsIgnoreCase("Delete")) {
1946      tool = new Delete();
1947    } else if (toRun.equalsIgnoreCase("Clean")) {
1948      tool = new Clean();
1949    } else if (toRun.equalsIgnoreCase("Search")) {
1950      tool = new Search();
1951    } else {
1952      usage();
1953      throw new RuntimeException("Unknown arg");
1954    }
1955
1956    return ToolRunner.run(getConf(), tool, otherArgs);
1957  }
1958
1959  @Override
1960  public TableName getTablename() {
1961    Configuration c = getConf();
1962    return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1963  }
1964
1965  @Override
1966  protected Set<String> getColumnFamilies() {
1967    if (isMultiUnevenColumnFamilies(getConf())) {
1968      return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
1969        Bytes.toString(TINY_FAMILY_NAME));
1970    } else {
1971      return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1972    }
1973  }
1974
1975  private static void setJobConf(Job job, int numMappers, long numNodes, Integer width,
1976    Integer wrapMultiplier, Integer numWalkers) {
1977    job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1978    job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1979    if (width != null) {
1980      job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1981    }
1982    if (wrapMultiplier != null) {
1983      job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1984    }
1985    if (numWalkers != null) {
1986      job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers);
1987    }
1988  }
1989
1990  public static void setJobScannerConf(Job job) {
1991    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1992  }
1993
1994  public static void main(String[] args) throws Exception {
1995    Configuration conf = HBaseConfiguration.create();
1996    IntegrationTestingUtility.setUseDistributedCluster(conf);
1997    int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1998    System.exit(ret);
1999  }
2000}