001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.BufferedReader;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.InputStreamReader;
028import java.io.InterruptedIOException;
029import java.nio.charset.StandardCharsets;
030import java.util.List;
031import java.util.Random;
032import java.util.Set;
033import java.util.SortedSet;
034import java.util.TreeSet;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.regex.Matcher;
038import java.util.regex.Pattern;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.LocatedFileStatus;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.fs.RemoteIterator;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.IntegrationTestBase;
049import org.apache.hadoop.hbase.IntegrationTestingUtility;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.Admin;
052import org.apache.hadoop.hbase.client.BufferedMutator;
053import org.apache.hadoop.hbase.client.BufferedMutatorParams;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
055import org.apache.hadoop.hbase.client.Connection;
056import org.apache.hadoop.hbase.client.ConnectionFactory;
057import org.apache.hadoop.hbase.client.Mutation;
058import org.apache.hadoop.hbase.client.Put;
059import org.apache.hadoop.hbase.client.Result;
060import org.apache.hadoop.hbase.client.Scan;
061import org.apache.hadoop.hbase.client.TableDescriptor;
062import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
063import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
064import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
065import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
066import org.apache.hadoop.hbase.mapreduce.TableMapper;
067import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
068import org.apache.hadoop.hbase.mapreduce.WALPlayer;
069import org.apache.hadoop.hbase.testclassification.IntegrationTests;
070import org.apache.hadoop.hbase.util.AbstractHBaseTool;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.CommonFSUtils;
073import org.apache.hadoop.hbase.wal.WALEdit;
074import org.apache.hadoop.hbase.wal.WALKey;
075import org.apache.hadoop.io.BytesWritable;
076import org.apache.hadoop.io.NullWritable;
077import org.apache.hadoop.io.Text;
078import org.apache.hadoop.mapreduce.Counter;
079import org.apache.hadoop.mapreduce.Job;
080import org.apache.hadoop.mapreduce.Mapper;
081import org.apache.hadoop.mapreduce.Reducer;
082import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
083import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
084import org.apache.hadoop.util.ToolRunner;
085import org.junit.Test;
086import org.junit.experimental.categories.Category;
087import org.slf4j.Logger;
088import org.slf4j.LoggerFactory;
089
090import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
091import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
092import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
093import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
094
095/**
096 * A large test which loads a lot of data that has internal references, and verifies the data. In
097 * load step, 200 map tasks are launched, which in turn write loadmapper.num_to_write (default 100K)
098 * rows to an hbase table. Rows are written in blocks, for a total of 100 blocks. Each row in a
099 * block, contains loadmapper.backrefs (default 50) references to random rows in the prev block.
100 * Verify step is scans the table, and verifies that for every referenced row, the row is actually
101 * there (no data loss). Failed rows are output from reduce to be saved in the job output dir in
102 * hdfs and inspected later. This class can be run as a unit test, as an integration test, or from
103 * the command line Originally taken from Apache Bigtop.
104 */
105@Category(IntegrationTests.class)
106public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
107
108  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadAndVerify.class);
109
110  private static final String TEST_NAME = "IntegrationTestLoadAndVerify";
111  private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
112  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
113
114  private static final String NUM_TO_WRITE_KEY = "loadmapper.num_to_write";
115  private static final long NUM_TO_WRITE_DEFAULT = 100 * 1000;
116
117  private static final String TABLE_NAME_KEY = "loadmapper.table";
118  private static final String TABLE_NAME_DEFAULT = "table";
119
120  private static final String NUM_BACKREFS_KEY = "loadmapper.backrefs";
121  private static final int NUM_BACKREFS_DEFAULT = 50;
122
123  private static final String NUM_MAP_TASKS_KEY = "loadmapper.map.tasks";
124  private static final String NUM_REDUCE_TASKS_KEY = "verify.reduce.tasks";
125  private static final int NUM_MAP_TASKS_DEFAULT = 200;
126  private static final int NUM_REDUCE_TASKS_DEFAULT = 35;
127
128  private static final int SCANNER_CACHING = 500;
129
130  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
131
132  private String toRun = null;
133  private String keysDir = null;
134
135  private enum Counters {
136    ROWS_WRITTEN,
137    REFERENCES_WRITTEN,
138    REFERENCES_CHECKED
139  }
140
141  @Override
142  public void setUpCluster() throws Exception {
143    util = getTestingUtil(getConf());
144    util.initializeCluster(3);
145    this.setConf(util.getConfiguration());
146    if (!util.isDistributedCluster()) {
147      getConf().setLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT / 100);
148      getConf().setInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT / 100);
149      getConf().setInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT / 10);
150      util.startMiniMapReduceCluster();
151    }
152  }
153
154  @Override
155  public void cleanUpCluster() throws Exception {
156    super.cleanUpCluster();
157    if (!util.isDistributedCluster()) {
158      util.shutdownMiniMapReduceCluster();
159    }
160  }
161
162  /**
163   * Converts a "long" value between endian systems. Borrowed from Apache Commons IO
164   * @param value value to convert
165   * @return the converted value
166   */
167  public static long swapLong(long value) {
168    return (((value >> 0) & 0xff) << 56) + (((value >> 8) & 0xff) << 48)
169      + (((value >> 16) & 0xff) << 40) + (((value >> 24) & 0xff) << 32)
170      + (((value >> 32) & 0xff) << 24) + (((value >> 40) & 0xff) << 16)
171      + (((value >> 48) & 0xff) << 8) + (((value >> 56) & 0xff) << 0);
172  }
173
174  public static class LoadMapper
175    extends Mapper<NullWritable, NullWritable, NullWritable, NullWritable> {
176    protected long recordsToWrite;
177    protected Connection connection;
178    protected BufferedMutator mutator;
179    protected Configuration conf;
180    protected int numBackReferencesPerRow;
181    protected String shortTaskId;
182    protected Counter rowsWritten, refsWritten;
183
184    @Override
185    public void setup(Context context) throws IOException {
186      conf = context.getConfiguration();
187      recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT);
188      String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
189      numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
190      this.connection = ConnectionFactory.createConnection(conf);
191      mutator = connection.getBufferedMutator(
192        new BufferedMutatorParams(TableName.valueOf(tableName)).writeBufferSize(4 * 1024 * 1024));
193
194      String taskId = conf.get("mapreduce.task.attempt.id");
195      Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
196      if (!matcher.matches()) {
197        throw new RuntimeException("Strange task ID: " + taskId);
198      }
199      shortTaskId = matcher.group(1);
200
201      rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
202      refsWritten = context.getCounter(Counters.REFERENCES_WRITTEN);
203    }
204
205    @Override
206    public void cleanup(Context context) throws IOException {
207      mutator.close();
208      connection.close();
209    }
210
211    @Override
212    protected void map(NullWritable key, NullWritable value, Context context)
213      throws IOException, InterruptedException {
214
215      String suffix = "/" + shortTaskId;
216      byte[] row = Bytes.add(new byte[8], Bytes.toBytes(suffix));
217      int BLOCK_SIZE = (int) (recordsToWrite / 100);
218      Random rand = ThreadLocalRandom.current();
219
220      for (long i = 0; i < recordsToWrite;) {
221        long blockStart = i;
222        for (long idxInBlock = 0; idxInBlock < BLOCK_SIZE
223          && i < recordsToWrite; idxInBlock++, i++) {
224
225          long byteSwapped = swapLong(i);
226          Bytes.putLong(row, 0, byteSwapped);
227
228          Put p = new Put(row);
229          p.addColumn(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
230          if (blockStart > 0) {
231            for (int j = 0; j < numBackReferencesPerRow; j++) {
232              long referredRow = blockStart - BLOCK_SIZE + rand.nextInt(BLOCK_SIZE);
233              Bytes.putLong(row, 0, swapLong(referredRow));
234              p.addColumn(TEST_FAMILY, row, HConstants.EMPTY_BYTE_ARRAY);
235            }
236            refsWritten.increment(1);
237          }
238          rowsWritten.increment(1);
239          mutator.mutate(p);
240
241          if (i % 100 == 0) {
242            context.setStatus("Written " + i + "/" + recordsToWrite + " records");
243            context.progress();
244          }
245        }
246        // End of block, flush all of them before we start writing anything
247        // pointing to these!
248        mutator.flush();
249      }
250    }
251  }
252
253  public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
254    static final BytesWritable EMPTY = new BytesWritable(HConstants.EMPTY_BYTE_ARRAY);
255
256    @Override
257    protected void map(ImmutableBytesWritable key, Result value, Context context)
258      throws IOException, InterruptedException {
259      BytesWritable bwKey = new BytesWritable(key.get());
260      BytesWritable bwVal = new BytesWritable();
261      for (Cell kv : value.listCells()) {
262        if (
263          Bytes.compareTo(TEST_QUALIFIER, 0, TEST_QUALIFIER.length, kv.getQualifierArray(),
264            kv.getQualifierOffset(), kv.getQualifierLength()) == 0
265        ) {
266          context.write(bwKey, EMPTY);
267        } else {
268          bwVal.set(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
269          context.write(bwVal, bwKey);
270        }
271      }
272    }
273  }
274
275  public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> {
276    private Counter refsChecked;
277    private Counter rowsWritten;
278
279    @Override
280    public void setup(Context context) throws IOException {
281      refsChecked = context.getCounter(Counters.REFERENCES_CHECKED);
282      rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
283    }
284
285    @Override
286    protected void reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers,
287      VerifyReducer.Context ctx) throws IOException, InterruptedException {
288      boolean gotOriginalRow = false;
289      int refCount = 0;
290
291      for (BytesWritable ref : referrers) {
292        if (ref.getLength() == 0) {
293          assert !gotOriginalRow;
294          gotOriginalRow = true;
295        } else {
296          refCount++;
297        }
298      }
299      refsChecked.increment(refCount);
300
301      if (!gotOriginalRow) {
302        String parsedRow = makeRowReadable(referredRow.getBytes(), referredRow.getLength());
303        String binRow = Bytes.toStringBinary(referredRow.getBytes(), 0, referredRow.getLength());
304        LOG.error("Reference error row " + parsedRow);
305        ctx.write(new Text(binRow), new Text(parsedRow));
306        rowsWritten.increment(1);
307      }
308    }
309
310    private String makeRowReadable(byte[] bytes, int length) {
311      long rowIdx = swapLong(Bytes.toLong(bytes, 0));
312      String suffix = Bytes.toString(bytes, 8, length - 8);
313
314      return "Row #" + rowIdx + " suffix " + suffix;
315    }
316  }
317
318  protected Job doLoad(Configuration conf, TableDescriptor tableDescriptor) throws Exception {
319    Path outputDir = getTestDir(TEST_NAME, "load-output");
320    LOG.info("Load output dir: " + outputDir);
321
322    NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
323    conf.set(TABLE_NAME_KEY, tableDescriptor.getTableName().getNameAsString());
324
325    Job job = Job.getInstance(conf);
326    job.setJobName(TEST_NAME + " Load for " + tableDescriptor.getTableName());
327    job.setJarByClass(this.getClass());
328    setMapperClass(job);
329    job.setInputFormatClass(NMapInputFormat.class);
330    job.setNumReduceTasks(0);
331    setJobScannerConf(job);
332    FileOutputFormat.setOutputPath(job, outputDir);
333
334    TableMapReduceUtil.addDependencyJars(job);
335
336    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
337    TableMapReduceUtil.initCredentials(job);
338    assertTrue(job.waitForCompletion(true));
339    return job;
340  }
341
342  protected void setMapperClass(Job job) {
343    job.setMapperClass(LoadMapper.class);
344  }
345
346  protected void doVerify(Configuration conf, TableDescriptor tableDescriptor) throws Exception {
347    Path outputDir = getTestDir(TEST_NAME, "verify-output");
348    LOG.info("Verify output dir: " + outputDir);
349
350    Job job = Job.getInstance(conf);
351    job.setJarByClass(this.getClass());
352    job.setJobName(TEST_NAME + " Verification for " + tableDescriptor.getTableName());
353    setJobScannerConf(job);
354
355    Scan scan = new Scan();
356
357    TableMapReduceUtil.initTableMapperJob(tableDescriptor.getTableName().getNameAsString(), scan,
358      VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
359    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
360    int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
361    TableMapReduceUtil.setScannerCaching(job, scannerCaching);
362
363    job.setReducerClass(VerifyReducer.class);
364    job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
365    FileOutputFormat.setOutputPath(job, outputDir);
366    assertTrue(job.waitForCompletion(true));
367
368    long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
369    assertEquals(0, numOutputRecords);
370  }
371
372  /**
373   * Tool to search missing rows in WALs and hfiles. Pass in file or dir of keys to search for. Key
374   * file must have been written by Verify step (we depend on the format it writes out. We'll read
375   * them in and then search in hbase WALs and oldWALs dirs (Some of this is TODO).
376   */
377  public static class WALSearcher extends WALPlayer {
378    public WALSearcher(Configuration conf) {
379      super(conf);
380    }
381
382    /**
383     * The actual searcher mapper.
384     */
385    public static class WALMapperSearcher extends WALMapper {
386      private SortedSet<byte[]> keysToFind;
387      private AtomicInteger rows = new AtomicInteger(0);
388
389      @Override
390      public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
391        throws IOException {
392        super.setup(context);
393        try {
394          this.keysToFind = readKeysToSearch(context.getConfiguration());
395          LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
396        } catch (InterruptedException e) {
397          throw new InterruptedIOException(e.toString());
398        }
399      }
400
401      @Override
402      protected boolean filter(Context context, Cell cell) {
403        // TODO: Can I do a better compare than this copying out key?
404        byte[] row = new byte[cell.getRowLength()];
405        System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
406        boolean b = this.keysToFind.contains(row);
407        if (b) {
408          String keyStr = Bytes.toStringBinary(row);
409          try {
410            LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
411          } catch (IOException | InterruptedException e) {
412            LOG.warn(e.toString(), e);
413          }
414          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
415            context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
416          }
417          context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
418        }
419        return b;
420      }
421    }
422
423    // Put in place the above WALMapperSearcher.
424    @Override
425    public Job createSubmittableJob(String[] args) throws IOException {
426      Job job = super.createSubmittableJob(args);
427      // Call my class instead.
428      job.setJarByClass(WALMapperSearcher.class);
429      job.setMapperClass(WALMapperSearcher.class);
430      job.setOutputFormatClass(NullOutputFormat.class);
431      return job;
432    }
433  }
434
435  static final String FOUND_GROUP_KEY = "Found";
436  static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
437
438  static SortedSet<byte[]> readKeysToSearch(final Configuration conf)
439    throws IOException, InterruptedException {
440    Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
441    FileSystem fs = FileSystem.get(conf);
442    SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
443    if (!fs.exists(keysInputDir)) {
444      throw new FileNotFoundException(keysInputDir.toString());
445    }
446    if (!fs.isDirectory(keysInputDir)) {
447      FileStatus keyFileStatus = fs.getFileStatus(keysInputDir);
448      readFileToSearch(fs, keyFileStatus, result);
449    } else {
450      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
451      while (iterator.hasNext()) {
452        LocatedFileStatus keyFileStatus = iterator.next();
453        // Skip "_SUCCESS" file.
454        if (keyFileStatus.getPath().getName().startsWith("_")) continue;
455        readFileToSearch(fs, keyFileStatus, result);
456      }
457    }
458    return result;
459  }
460
461  private static SortedSet<byte[]> readFileToSearch(final FileSystem fs,
462    final FileStatus keyFileStatus, SortedSet<byte[]> result)
463    throws IOException, InterruptedException {
464    // verify uses file output format and writes <Text, Text>. We can read it as a text file
465    try (InputStream in = fs.open(keyFileStatus.getPath()); BufferedReader reader =
466      new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
467      // extract out the key and return that missing as a missing key
468      String line;
469      while ((line = reader.readLine()) != null) {
470        if (line.isEmpty()) continue;
471        List<String> parts = Splitter.onPattern("\\s+").splitToList(line);
472        if (parts.size() >= 1) {
473          result.add(Bytes.toBytesBinary(Iterables.get(parts, 0)));
474        } else {
475          LOG.info("Cannot parse key from: " + line);
476        }
477      }
478    }
479    return result;
480  }
481
482  private int doSearch(String keysDir) throws Exception {
483    Path inputDir = new Path(keysDir);
484
485    getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
486    SortedSet<byte[]> keys = readKeysToSearch(getConf());
487    if (keys.isEmpty()) throw new RuntimeException("No keys to find");
488    LOG.info("Count of keys to find: " + keys.size());
489    for (byte[] key : keys)
490      LOG.info("Key: " + Bytes.toStringBinary(key));
491    // Now read all WALs. In two dirs. Presumes certain layout.
492    Path walsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
493    Path oldWalsDir =
494      new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
495    LOG.info("Running Search with keys inputDir=" + inputDir + " against "
496      + getConf().get(HConstants.HBASE_DIR));
497    int ret = ToolRunner.run(new WALSearcher(getConf()), new String[] { walsDir.toString(), "" });
498    if (ret != 0) return ret;
499    return ToolRunner.run(new WALSearcher(getConf()), new String[] { oldWalsDir.toString(), "" });
500  }
501
502  private static void setJobScannerConf(Job job) {
503    long lpr = job.getConfiguration().getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT) / 100;
504    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, (int) lpr);
505  }
506
507  public Path getTestDir(String testName, String subdir) throws IOException {
508    Path testDir = util.getDataTestDirOnTestFS(testName);
509    FileSystem fs = FileSystem.get(getConf());
510    fs.deleteOnExit(testDir);
511
512    return new Path(new Path(testDir, testName), subdir);
513  }
514
515  @Test
516  public void testLoadAndVerify() throws Exception {
517    TableDescriptor tableDescriptor =
518      TableDescriptorBuilder.newBuilder(TableName.valueOf(TEST_NAME))
519        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY)).build();
520
521    Admin admin = getTestingUtil(getConf()).getAdmin();
522    admin.createTable(tableDescriptor, Bytes.toBytes(0L), Bytes.toBytes(-1L), 40);
523
524    doLoad(getConf(), tableDescriptor);
525    doVerify(getConf(), tableDescriptor);
526
527    // Only disable and drop if we succeeded to verify - otherwise it's useful
528    // to leave it around for post-mortem
529    getTestingUtil(getConf()).deleteTable(tableDescriptor.getTableName());
530  }
531
532  @Override
533  public void printUsage() {
534    printUsage(this.getClass().getSimpleName() + " <options>"
535      + " [-Doptions] <load|verify|loadAndVerify|search>", "Options", "");
536    System.err.println("");
537    System.err.println("  Loads a table with row dependencies and verifies the dependency chains");
538    System.err.println("Options");
539    System.err
540      .println("  -Dloadmapper.table=<name>        Table to write/verify (default autogen)");
541    System.err
542      .println("  -Dloadmapper.backrefs=<n>        Number of backreferences per row (default 50)");
543    System.err.println(
544      "  -Dloadmapper.num_to_write=<n>    Number of rows per mapper (default 100,000 per mapper)");
545    System.err.println(
546      "  -Dloadmapper.deleteAfter=<bool>  Delete after a successful verify (default true)");
547    System.err.println(
548      "  -Dloadmapper.numPresplits=<n>    Number of presplit regions to start with (default 40)");
549    System.err
550      .println("  -Dloadmapper.map.tasks=<n>       Number of map tasks for load (default 200)");
551    System.err
552      .println("  -Dverify.reduce.tasks=<n>        Number of reduce tasks for verify (default 35)");
553    System.err.println(
554      "  -Dverify.scannercaching=<n>      Number hbase scanner caching rows to read (default 50)");
555  }
556
557  @Override
558  protected void processOptions(CommandLine cmd) {
559    super.processOptions(cmd);
560
561    String[] args = cmd.getArgs();
562    if (args == null || args.length < 1) {
563      printUsage();
564      throw new RuntimeException("Incorrect Number of args.");
565    }
566    toRun = args[0];
567    if (toRun.equalsIgnoreCase("search")) {
568      if (args.length > 1) {
569        keysDir = args[1];
570      }
571    }
572  }
573
574  @Override
575  public int runTestFromCommandLine() throws Exception {
576    IntegrationTestingUtility.setUseDistributedCluster(getConf());
577    boolean doLoad = false;
578    boolean doVerify = false;
579    boolean doSearch = false;
580    boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter", true);
581    int numPresplits = getConf().getInt("loadmapper.numPresplits", 40);
582
583    if (toRun.equalsIgnoreCase("load")) {
584      doLoad = true;
585    } else if (toRun.equalsIgnoreCase("verify")) {
586      doVerify = true;
587    } else if (toRun.equalsIgnoreCase("loadAndVerify")) {
588      doLoad = true;
589      doVerify = true;
590    } else if (toRun.equalsIgnoreCase("search")) {
591      doLoad = false;
592      doVerify = false;
593      doSearch = true;
594      if (keysDir == null) {
595        System.err.println("Usage: search <KEYS_DIR>]");
596        return 1;
597      }
598    } else {
599      System.err.println("Invalid argument " + toRun);
600      printUsage();
601      return 1;
602    }
603
604    // create HTableDescriptor for specified table
605    TableName table = getTablename();
606    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(table)
607      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY)).build();
608
609    if (doLoad) {
610      try (Connection conn = ConnectionFactory.createConnection(getConf());
611        Admin admin = conn.getAdmin()) {
612        admin.createTable(tableDescriptor, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
613        doLoad(getConf(), tableDescriptor);
614      }
615    }
616    if (doVerify) {
617      doVerify(getConf(), tableDescriptor);
618      if (doDelete) {
619        getTestingUtil(getConf()).deleteTable(tableDescriptor.getTableName());
620      }
621    }
622    if (doSearch) {
623      return doSearch(keysDir);
624    }
625    return 0;
626  }
627
628  @Override
629  public TableName getTablename() {
630    return TableName.valueOf(getConf().get(TABLE_NAME_KEY, TEST_NAME));
631  }
632
633  @Override
634  protected Set<String> getColumnFamilies() {
635    return Sets.newHashSet(Bytes.toString(TEST_FAMILY));
636  }
637
638  public static void main(String argv[]) throws Exception {
639    Configuration conf = HBaseConfiguration.create();
640    IntegrationTestingUtility.setUseDistributedCluster(conf);
641    int ret = ToolRunner.run(conf, new IntegrationTestLoadAndVerify(), argv);
642    System.exit(ret);
643  }
644}