001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * <p/>
010 * http://www.apache.org/licenses/LICENSE-2.0
011 * <p/>
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Random;
025import java.util.Set;
026import java.util.regex.Matcher;
027import java.util.regex.Pattern;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.RemoteIterator;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HColumnDescriptor;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HTableDescriptor;
040import org.apache.hadoop.hbase.IntegrationTestBase;
041import org.apache.hadoop.hbase.IntegrationTestingUtility;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
045import org.apache.hadoop.hbase.testclassification.IntegrationTests;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.Admin;
048import org.apache.hadoop.hbase.client.BufferedMutator;
049import org.apache.hadoop.hbase.client.BufferedMutatorParams;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.client.ScannerCallable;
054import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
055import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
056import org.apache.hadoop.hbase.mapreduce.TableMapper;
057import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
058import org.apache.hadoop.hbase.util.AbstractHBaseTool;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.CommonFSUtils;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.io.NullWritable;
063import org.apache.hadoop.io.Text;
064import org.apache.hadoop.mapreduce.Counter;
065import org.apache.hadoop.mapreduce.Job;
066import org.apache.hadoop.mapreduce.Mapper;
067import org.apache.hadoop.mapreduce.Reducer;
068import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
069import org.apache.hadoop.util.ToolRunner;
070import org.apache.hadoop.hbase.wal.WALEdit;
071import org.apache.hadoop.hbase.wal.WALKey;
072import org.apache.hadoop.hbase.mapreduce.WALPlayer;
073import org.apache.hadoop.hbase.client.Mutation;
074import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
075import java.io.BufferedReader;
076import java.io.FileNotFoundException;
077import java.io.InputStream;
078import java.io.InputStreamReader;
079import java.io.InterruptedIOException;
080import java.util.SortedSet;
081import java.util.TreeSet;
082import java.util.concurrent.atomic.AtomicInteger;
083import org.junit.Test;
084import org.junit.experimental.categories.Category;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
088import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
089
090/**
091 * A large test which loads a lot of data that has internal references, and
092 * verifies the data.
093 *
094 * In load step, 200 map tasks are launched, which in turn write loadmapper.num_to_write
095 * (default 100K) rows to an hbase table. Rows are written in blocks, for a total of
096 * 100 blocks. Each row in a block, contains loadmapper.backrefs (default 50) references
097 * to random rows in the prev block.
098 *
099 * Verify step is scans the table, and verifies that for every referenced row, the row is
100 * actually there (no data loss). Failed rows are output from reduce to be saved in the
101 * job output dir in hdfs and inspected later.
102 *
103 * This class can be run as a unit test, as an integration test, or from the command line
104 *
105 * Originally taken from Apache Bigtop.
106 */
107@Category(IntegrationTests.class)
108public class IntegrationTestLoadAndVerify  extends IntegrationTestBase  {
109
110  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadAndVerify.class);
111
112  private static final String TEST_NAME = "IntegrationTestLoadAndVerify";
113  private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
114  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
115
116  private static final String NUM_TO_WRITE_KEY =
117    "loadmapper.num_to_write";
118  private static final long NUM_TO_WRITE_DEFAULT = 100*1000;
119
120  private static final String TABLE_NAME_KEY = "loadmapper.table";
121  private static final String TABLE_NAME_DEFAULT = "table";
122
123  private static final String NUM_BACKREFS_KEY = "loadmapper.backrefs";
124  private static final int NUM_BACKREFS_DEFAULT = 50;
125
126  private static final String NUM_MAP_TASKS_KEY = "loadmapper.map.tasks";
127  private static final String NUM_REDUCE_TASKS_KEY = "verify.reduce.tasks";
128  private static final int NUM_MAP_TASKS_DEFAULT = 200;
129  private static final int NUM_REDUCE_TASKS_DEFAULT = 35;
130
131  private static final int SCANNER_CACHING = 500;
132
133  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
134
135  private String toRun = null;
136  private String keysDir = null;
137
138  private enum Counters {
139    ROWS_WRITTEN,
140    REFERENCES_WRITTEN,
141    REFERENCES_CHECKED
142  }
143
144  @Override
145  public void setUpCluster() throws Exception {
146    util = getTestingUtil(getConf());
147    util.initializeCluster(3);
148    this.setConf(util.getConfiguration());
149    if (!util.isDistributedCluster()) {
150      getConf().setLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT / 100);
151      getConf().setInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT / 100);
152      getConf().setInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT / 10);
153      util.startMiniMapReduceCluster();
154    }
155  }
156
157@Override
158public void cleanUpCluster() throws Exception {
159  super.cleanUpCluster();
160  if (!util.isDistributedCluster()) {
161    util.shutdownMiniMapReduceCluster();
162  }
163}
164
165  /**
166   * Converts a "long" value between endian systems.
167   * Borrowed from Apache Commons IO
168   * @param value value to convert
169   * @return the converted value
170   */
171  public static long swapLong(long value)
172  {
173    return
174      ( ( ( value >> 0 ) & 0xff ) << 56 ) +
175      ( ( ( value >> 8 ) & 0xff ) << 48 ) +
176      ( ( ( value >> 16 ) & 0xff ) << 40 ) +
177      ( ( ( value >> 24 ) & 0xff ) << 32 ) +
178      ( ( ( value >> 32 ) & 0xff ) << 24 ) +
179      ( ( ( value >> 40 ) & 0xff ) << 16 ) +
180      ( ( ( value >> 48 ) & 0xff ) << 8 ) +
181      ( ( ( value >> 56 ) & 0xff ) << 0 );
182  }
183
184  public static class LoadMapper
185      extends Mapper<NullWritable, NullWritable, NullWritable, NullWritable>
186  {
187    protected long recordsToWrite;
188    protected Connection connection;
189    protected BufferedMutator mutator;
190    protected Configuration conf;
191    protected int numBackReferencesPerRow;
192
193    protected String shortTaskId;
194
195    protected Random rand = new Random();
196    protected Counter rowsWritten, refsWritten;
197
198    @Override
199    public void setup(Context context) throws IOException {
200      conf = context.getConfiguration();
201      recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT);
202      String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
203      numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
204      this.connection = ConnectionFactory.createConnection(conf);
205      mutator = connection.getBufferedMutator(
206          new BufferedMutatorParams(TableName.valueOf(tableName))
207              .writeBufferSize(4 * 1024 * 1024));
208
209      String taskId = conf.get("mapreduce.task.attempt.id");
210      Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
211      if (!matcher.matches()) {
212        throw new RuntimeException("Strange task ID: " + taskId);
213      }
214      shortTaskId = matcher.group(1);
215
216      rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
217      refsWritten = context.getCounter(Counters.REFERENCES_WRITTEN);
218    }
219
220    @Override
221    public void cleanup(Context context) throws IOException {
222      mutator.close();
223      connection.close();
224    }
225
226    @Override
227    protected void map(NullWritable key, NullWritable value,
228        Context context) throws IOException, InterruptedException {
229
230      String suffix = "/" + shortTaskId;
231      byte[] row = Bytes.add(new byte[8], Bytes.toBytes(suffix));
232
233      int BLOCK_SIZE = (int)(recordsToWrite / 100);
234
235      for (long i = 0; i < recordsToWrite;) {
236        long blockStart = i;
237        for (long idxInBlock = 0;
238             idxInBlock < BLOCK_SIZE && i < recordsToWrite;
239             idxInBlock++, i++) {
240
241          long byteSwapped = swapLong(i);
242          Bytes.putLong(row, 0, byteSwapped);
243
244          Put p = new Put(row);
245          p.addColumn(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
246          if (blockStart > 0) {
247            for (int j = 0; j < numBackReferencesPerRow; j++) {
248              long referredRow = blockStart - BLOCK_SIZE + rand.nextInt(BLOCK_SIZE);
249              Bytes.putLong(row, 0, swapLong(referredRow));
250              p.addColumn(TEST_FAMILY, row, HConstants.EMPTY_BYTE_ARRAY);
251            }
252            refsWritten.increment(1);
253          }
254          rowsWritten.increment(1);
255          mutator.mutate(p);
256
257          if (i % 100 == 0) {
258            context.setStatus("Written " + i + "/" + recordsToWrite + " records");
259            context.progress();
260          }
261        }
262        // End of block, flush all of them before we start writing anything
263        // pointing to these!
264        mutator.flush();
265      }
266    }
267  }
268
269  public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
270    static final BytesWritable EMPTY = new BytesWritable(HConstants.EMPTY_BYTE_ARRAY);
271
272
273    @Override
274    protected void map(ImmutableBytesWritable key, Result value, Context context)
275        throws IOException, InterruptedException {
276      BytesWritable bwKey = new BytesWritable(key.get());
277      BytesWritable bwVal = new BytesWritable();
278      for (Cell kv : value.listCells()) {
279        if (Bytes.compareTo(TEST_QUALIFIER, 0, TEST_QUALIFIER.length,
280                            kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()) == 0) {
281          context.write(bwKey, EMPTY);
282        } else {
283          bwVal.set(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
284          context.write(bwVal, bwKey);
285        }
286      }
287    }
288  }
289
290  public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> {
291    private Counter refsChecked;
292    private Counter rowsWritten;
293
294    @Override
295    public void setup(Context context) throws IOException {
296      refsChecked = context.getCounter(Counters.REFERENCES_CHECKED);
297      rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
298    }
299
300    @Override
301    protected void reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers,
302        VerifyReducer.Context ctx) throws IOException, InterruptedException {
303      boolean gotOriginalRow = false;
304      int refCount = 0;
305
306      for (BytesWritable ref : referrers) {
307        if (ref.getLength() == 0) {
308          assert !gotOriginalRow;
309          gotOriginalRow = true;
310        } else {
311          refCount++;
312        }
313      }
314      refsChecked.increment(refCount);
315
316      if (!gotOriginalRow) {
317        String parsedRow = makeRowReadable(referredRow.getBytes(), referredRow.getLength());
318        String binRow = Bytes.toStringBinary(referredRow.getBytes(), 0, referredRow.getLength());
319        LOG.error("Reference error row " + parsedRow);
320        ctx.write(new Text(binRow), new Text(parsedRow));
321        rowsWritten.increment(1);
322      }
323    }
324
325    private String makeRowReadable(byte[] bytes, int length) {
326      long rowIdx = swapLong(Bytes.toLong(bytes, 0));
327      String suffix = Bytes.toString(bytes, 8, length - 8);
328
329      return "Row #" + rowIdx + " suffix " + suffix;
330    }
331  }
332
333  protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception {
334    Path outputDir = getTestDir(TEST_NAME, "load-output");
335    LOG.info("Load output dir: " + outputDir);
336
337    NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
338    conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
339
340    Job job = Job.getInstance(conf);
341    job.setJobName(TEST_NAME + " Load for " + htd.getTableName());
342    job.setJarByClass(this.getClass());
343    setMapperClass(job);
344    job.setInputFormatClass(NMapInputFormat.class);
345    job.setNumReduceTasks(0);
346    setJobScannerConf(job);
347    FileOutputFormat.setOutputPath(job, outputDir);
348
349    TableMapReduceUtil.addDependencyJars(job);
350
351    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
352    TableMapReduceUtil.initCredentials(job);
353    assertTrue(job.waitForCompletion(true));
354    return job;
355  }
356
357  protected void setMapperClass(Job job) {
358    job.setMapperClass(LoadMapper.class);
359  }
360
361  protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
362    Path outputDir = getTestDir(TEST_NAME, "verify-output");
363    LOG.info("Verify output dir: " + outputDir);
364
365    Job job = Job.getInstance(conf);
366    job.setJarByClass(this.getClass());
367    job.setJobName(TEST_NAME + " Verification for " + htd.getTableName());
368    setJobScannerConf(job);
369
370    Scan scan = new Scan();
371
372    TableMapReduceUtil.initTableMapperJob(
373        htd.getTableName().getNameAsString(), scan, VerifyMapper.class,
374        BytesWritable.class, BytesWritable.class, job);
375    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
376    int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
377    TableMapReduceUtil.setScannerCaching(job, scannerCaching);
378
379    job.setReducerClass(VerifyReducer.class);
380    job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
381    FileOutputFormat.setOutputPath(job, outputDir);
382    assertTrue(job.waitForCompletion(true));
383
384    long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
385    assertEquals(0, numOutputRecords);
386  }
387
388  /**
389   * Tool to search missing rows in WALs and hfiles.
390   * Pass in file or dir of keys to search for. Key file must have been written by Verify step
391   * (we depend on the format it writes out. We'll read them in and then search in hbase
392   * WALs and oldWALs dirs (Some of this is TODO).
393   */
394  public static class WALSearcher extends WALPlayer {
395    public WALSearcher(Configuration conf) {
396      super(conf);
397    }
398
399    /**
400     * The actual searcher mapper.
401     */
402    public static class WALMapperSearcher extends WALMapper {
403      private SortedSet<byte []> keysToFind;
404      private AtomicInteger rows = new AtomicInteger(0);
405
406      @Override
407      public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
408          throws IOException {
409        super.setup(context);
410        try {
411          this.keysToFind = readKeysToSearch(context.getConfiguration());
412          LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
413        } catch (InterruptedException e) {
414          throw new InterruptedIOException(e.toString());
415        }
416      }
417
418      @Override
419      protected boolean filter(Context context, Cell cell) {
420        // TODO: Can I do a better compare than this copying out key?
421        byte [] row = new byte [cell.getRowLength()];
422        System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
423        boolean b = this.keysToFind.contains(row);
424        if (b) {
425          String keyStr = Bytes.toStringBinary(row);
426          try {
427            LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
428          } catch (IOException|InterruptedException e) {
429            LOG.warn(e.toString(), e);
430          }
431          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
432            context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
433          }
434          context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
435        }
436        return b;
437      }
438    }
439
440    // Put in place the above WALMapperSearcher.
441    @Override
442    public Job createSubmittableJob(String[] args) throws IOException {
443      Job job = super.createSubmittableJob(args);
444      // Call my class instead.
445      job.setJarByClass(WALMapperSearcher.class);
446      job.setMapperClass(WALMapperSearcher.class);
447      job.setOutputFormatClass(NullOutputFormat.class);
448      return job;
449    }
450  }
451
452  static final String FOUND_GROUP_KEY = "Found";
453  static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
454
455  static SortedSet<byte []> readKeysToSearch(final Configuration conf)
456      throws IOException, InterruptedException {
457    Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
458    FileSystem fs = FileSystem.get(conf);
459    SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
460    if (!fs.exists(keysInputDir)) {
461      throw new FileNotFoundException(keysInputDir.toString());
462    }
463    if (!fs.isDirectory(keysInputDir)) {
464      FileStatus keyFileStatus = fs.getFileStatus(keysInputDir);
465      readFileToSearch(conf, fs, keyFileStatus, result);
466    } else {
467      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
468      while(iterator.hasNext()) {
469        LocatedFileStatus keyFileStatus = iterator.next();
470        // Skip "_SUCCESS" file.
471        if (keyFileStatus.getPath().getName().startsWith("_")) continue;
472        readFileToSearch(conf, fs, keyFileStatus, result);
473      }
474    }
475    return result;
476  }
477
478  private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
479    final FileSystem fs, final FileStatus keyFileStatus, SortedSet<byte []> result)
480        throws IOException,
481    InterruptedException {
482    // verify uses file output format and writes <Text, Text>. We can read it as a text file
483    try (InputStream in = fs.open(keyFileStatus.getPath());
484        BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
485      // extract out the key and return that missing as a missing key
486      String line;
487      while ((line = reader.readLine()) != null) {
488        if (line.isEmpty()) continue;
489
490        String[] parts = line.split("\\s+");
491        if (parts.length >= 1) {
492          String key = parts[0];
493          result.add(Bytes.toBytesBinary(key));
494        } else {
495          LOG.info("Cannot parse key from: " + line);
496        }
497      }
498    }
499    return result;
500  }
501
502  private int doSearch(Configuration conf, String keysDir) throws Exception {
503    Path inputDir = new Path(keysDir);
504
505    getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
506    SortedSet<byte []> keys = readKeysToSearch(getConf());
507    if (keys.isEmpty()) throw new RuntimeException("No keys to find");
508    LOG.info("Count of keys to find: " + keys.size());
509    for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
510    // Now read all WALs. In two dirs. Presumes certain layout.
511    Path walsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
512    Path oldWalsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
513    LOG.info("Running Search with keys inputDir=" + inputDir +
514      " against " + getConf().get(HConstants.HBASE_DIR));
515    int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
516    if (ret != 0) return ret;
517    return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
518  }
519
520  private static void setJobScannerConf(Job job) {
521    // Make sure scanners log something useful to make debugging possible.
522    job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
523    long lpr = job.getConfiguration().getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT) / 100;
524    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, (int)lpr);
525  }
526
527  public Path getTestDir(String testName, String subdir) throws IOException {
528    Path testDir = util.getDataTestDirOnTestFS(testName);
529    FileSystem fs = FileSystem.get(getConf());
530    fs.deleteOnExit(testDir);
531
532    return new Path(new Path(testDir, testName), subdir);
533  }
534
535  @Test
536  public void testLoadAndVerify() throws Exception {
537    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TEST_NAME));
538    htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
539
540    Admin admin = getTestingUtil(getConf()).getAdmin();
541    admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), 40);
542
543    doLoad(getConf(), htd);
544    doVerify(getConf(), htd);
545
546    // Only disable and drop if we succeeded to verify - otherwise it's useful
547    // to leave it around for post-mortem
548    getTestingUtil(getConf()).deleteTable(htd.getTableName());
549  }
550
551  @Override
552  public void printUsage() {
553    printUsage(this.getClass().getSimpleName() + " <options>"
554        + " [-Doptions] <load|verify|loadAndVerify|search>", "Options", "");
555    System.err.println("");
556    System.err.println("  Loads a table with row dependencies and verifies the dependency chains");
557    System.err.println("Options");
558    System.err.println("  -Dloadmapper.table=<name>        Table to write/verify (default autogen)");
559    System.err.println("  -Dloadmapper.backrefs=<n>        Number of backreferences per row (default 50)");
560    System.err.println("  -Dloadmapper.num_to_write=<n>    Number of rows per mapper (default 100,000 per mapper)");
561    System.err.println("  -Dloadmapper.deleteAfter=<bool>  Delete after a successful verify (default true)");
562    System.err.println("  -Dloadmapper.numPresplits=<n>    Number of presplit regions to start with (default 40)");
563    System.err.println("  -Dloadmapper.map.tasks=<n>       Number of map tasks for load (default 200)");
564    System.err.println("  -Dverify.reduce.tasks=<n>        Number of reduce tasks for verify (default 35)");
565    System.err.println("  -Dverify.scannercaching=<n>      Number hbase scanner caching rows to read (default 50)");
566  }
567
568
569  @Override
570  protected void processOptions(CommandLine cmd) {
571    super.processOptions(cmd);
572
573    String[] args = cmd.getArgs();
574    if (args == null || args.length < 1) {
575      printUsage();
576      throw new RuntimeException("Incorrect Number of args.");
577    }
578    toRun = args[0];
579    if (toRun.equalsIgnoreCase("search")) {
580      if (args.length > 1) {
581        keysDir = args[1];
582      }
583    }
584  }
585
586  @Override
587  public int runTestFromCommandLine() throws Exception {
588    IntegrationTestingUtility.setUseDistributedCluster(getConf());
589    boolean doLoad = false;
590    boolean doVerify = false;
591    boolean doSearch = false;
592    boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true);
593    int numPresplits = getConf().getInt("loadmapper.numPresplits", 40);
594
595    if (toRun.equalsIgnoreCase("load")) {
596      doLoad = true;
597    } else if (toRun.equalsIgnoreCase("verify")) {
598      doVerify= true;
599    } else if (toRun.equalsIgnoreCase("loadAndVerify")) {
600      doLoad=true;
601      doVerify= true;
602    } else if (toRun.equalsIgnoreCase("search")) {
603      doLoad=false;
604      doVerify= false;
605      doSearch = true;
606      if (keysDir == null) {
607        System.err.println("Usage: search <KEYS_DIR>]");
608        return 1;
609      }
610    } else {
611      System.err.println("Invalid argument " + toRun);
612      printUsage();
613      return 1;
614    }
615
616    // create HTableDescriptor for specified table
617    TableName table = getTablename();
618    HTableDescriptor htd = new HTableDescriptor(table);
619    htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
620
621    if (doLoad) {
622      try (Connection conn = ConnectionFactory.createConnection(getConf());
623          Admin admin = conn.getAdmin()) {
624        admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
625        doLoad(getConf(), htd);
626      }
627    }
628    if (doVerify) {
629      doVerify(getConf(), htd);
630      if (doDelete) {
631        getTestingUtil(getConf()).deleteTable(htd.getTableName());
632      }
633    }
634    if (doSearch) {
635      return doSearch(getConf(), keysDir);
636    }
637    return 0;
638  }
639
640  @Override
641  public TableName getTablename() {
642    return TableName.valueOf(getConf().get(TABLE_NAME_KEY, TEST_NAME));
643  }
644
645  @Override
646  protected Set<String> getColumnFamilies() {
647    return Sets.newHashSet(Bytes.toString(TEST_FAMILY));
648  }
649
650  public static void main(String argv[]) throws Exception {
651    Configuration conf = HBaseConfiguration.create();
652    IntegrationTestingUtility.setUseDistributedCluster(conf);
653    int ret = ToolRunner.run(conf, new IntegrationTestLoadAndVerify(), argv);
654    System.exit(ret);
655  }
656}