001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * <p/>
010 * http://www.apache.org/licenses/LICENSE-2.0
011 * <p/>
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.BufferedReader;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.InputStreamReader;
028import java.io.InterruptedIOException;
029import java.util.Random;
030import java.util.Set;
031import java.util.SortedSet;
032import java.util.TreeSet;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.regex.Matcher;
035import java.util.regex.Pattern;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.LocatedFileStatus;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.fs.RemoteIterator;
042import org.apache.hadoop.hbase.Cell;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HColumnDescriptor;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HTableDescriptor;
047import org.apache.hadoop.hbase.IntegrationTestBase;
048import org.apache.hadoop.hbase.IntegrationTestingUtility;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.Admin;
051import org.apache.hadoop.hbase.client.BufferedMutator;
052import org.apache.hadoop.hbase.client.BufferedMutatorParams;
053import org.apache.hadoop.hbase.client.Connection;
054import org.apache.hadoop.hbase.client.ConnectionFactory;
055import org.apache.hadoop.hbase.client.Mutation;
056import org.apache.hadoop.hbase.client.Put;
057import org.apache.hadoop.hbase.client.Result;
058import org.apache.hadoop.hbase.client.Scan;
059import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
060import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
061import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
062import org.apache.hadoop.hbase.mapreduce.TableMapper;
063import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
064import org.apache.hadoop.hbase.mapreduce.WALPlayer;
065import org.apache.hadoop.hbase.testclassification.IntegrationTests;
066import org.apache.hadoop.hbase.util.AbstractHBaseTool;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.CommonFSUtils;
069import org.apache.hadoop.hbase.wal.WALEdit;
070import org.apache.hadoop.hbase.wal.WALKey;
071import org.apache.hadoop.io.BytesWritable;
072import org.apache.hadoop.io.NullWritable;
073import org.apache.hadoop.io.Text;
074import org.apache.hadoop.mapreduce.Counter;
075import org.apache.hadoop.mapreduce.Job;
076import org.apache.hadoop.mapreduce.Mapper;
077import org.apache.hadoop.mapreduce.Reducer;
078import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
079import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
080import org.apache.hadoop.util.ToolRunner;
081import org.junit.Test;
082import org.junit.experimental.categories.Category;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
087import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
088
089/**
090 * A large test which loads a lot of data that has internal references, and
091 * verifies the data.
092 *
093 * In load step, 200 map tasks are launched, which in turn write loadmapper.num_to_write
094 * (default 100K) rows to an hbase table. Rows are written in blocks, for a total of
095 * 100 blocks. Each row in a block, contains loadmapper.backrefs (default 50) references
096 * to random rows in the prev block.
097 *
098 * Verify step is scans the table, and verifies that for every referenced row, the row is
099 * actually there (no data loss). Failed rows are output from reduce to be saved in the
100 * job output dir in hdfs and inspected later.
101 *
102 * This class can be run as a unit test, as an integration test, or from the command line
103 *
104 * Originally taken from Apache Bigtop.
105 */
106@Category(IntegrationTests.class)
107public class IntegrationTestLoadAndVerify  extends IntegrationTestBase  {
108
109  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadAndVerify.class);
110
111  private static final String TEST_NAME = "IntegrationTestLoadAndVerify";
112  private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
113  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
114
115  private static final String NUM_TO_WRITE_KEY =
116    "loadmapper.num_to_write";
117  private static final long NUM_TO_WRITE_DEFAULT = 100*1000;
118
119  private static final String TABLE_NAME_KEY = "loadmapper.table";
120  private static final String TABLE_NAME_DEFAULT = "table";
121
122  private static final String NUM_BACKREFS_KEY = "loadmapper.backrefs";
123  private static final int NUM_BACKREFS_DEFAULT = 50;
124
125  private static final String NUM_MAP_TASKS_KEY = "loadmapper.map.tasks";
126  private static final String NUM_REDUCE_TASKS_KEY = "verify.reduce.tasks";
127  private static final int NUM_MAP_TASKS_DEFAULT = 200;
128  private static final int NUM_REDUCE_TASKS_DEFAULT = 35;
129
130  private static final int SCANNER_CACHING = 500;
131
132  private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
133
134  private String toRun = null;
135  private String keysDir = null;
136
137  private enum Counters {
138    ROWS_WRITTEN,
139    REFERENCES_WRITTEN,
140    REFERENCES_CHECKED
141  }
142
143  @Override
144  public void setUpCluster() throws Exception {
145    util = getTestingUtil(getConf());
146    util.initializeCluster(3);
147    this.setConf(util.getConfiguration());
148    if (!util.isDistributedCluster()) {
149      getConf().setLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT / 100);
150      getConf().setInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT / 100);
151      getConf().setInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT / 10);
152      util.startMiniMapReduceCluster();
153    }
154  }
155
156@Override
157public void cleanUpCluster() throws Exception {
158  super.cleanUpCluster();
159  if (!util.isDistributedCluster()) {
160    util.shutdownMiniMapReduceCluster();
161  }
162}
163
164  /**
165   * Converts a "long" value between endian systems.
166   * Borrowed from Apache Commons IO
167   * @param value value to convert
168   * @return the converted value
169   */
170  public static long swapLong(long value)
171  {
172    return
173      ( ( ( value >> 0 ) & 0xff ) << 56 ) +
174      ( ( ( value >> 8 ) & 0xff ) << 48 ) +
175      ( ( ( value >> 16 ) & 0xff ) << 40 ) +
176      ( ( ( value >> 24 ) & 0xff ) << 32 ) +
177      ( ( ( value >> 32 ) & 0xff ) << 24 ) +
178      ( ( ( value >> 40 ) & 0xff ) << 16 ) +
179      ( ( ( value >> 48 ) & 0xff ) << 8 ) +
180      ( ( ( value >> 56 ) & 0xff ) << 0 );
181  }
182
183  public static class LoadMapper
184      extends Mapper<NullWritable, NullWritable, NullWritable, NullWritable>
185  {
186    protected long recordsToWrite;
187    protected Connection connection;
188    protected BufferedMutator mutator;
189    protected Configuration conf;
190    protected int numBackReferencesPerRow;
191
192    protected String shortTaskId;
193
194    protected Random rand = new Random();
195    protected Counter rowsWritten, refsWritten;
196
197    @Override
198    public void setup(Context context) throws IOException {
199      conf = context.getConfiguration();
200      recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT);
201      String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
202      numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT);
203      this.connection = ConnectionFactory.createConnection(conf);
204      mutator = connection.getBufferedMutator(
205          new BufferedMutatorParams(TableName.valueOf(tableName))
206              .writeBufferSize(4 * 1024 * 1024));
207
208      String taskId = conf.get("mapreduce.task.attempt.id");
209      Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId);
210      if (!matcher.matches()) {
211        throw new RuntimeException("Strange task ID: " + taskId);
212      }
213      shortTaskId = matcher.group(1);
214
215      rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
216      refsWritten = context.getCounter(Counters.REFERENCES_WRITTEN);
217    }
218
219    @Override
220    public void cleanup(Context context) throws IOException {
221      mutator.close();
222      connection.close();
223    }
224
225    @Override
226    protected void map(NullWritable key, NullWritable value,
227        Context context) throws IOException, InterruptedException {
228
229      String suffix = "/" + shortTaskId;
230      byte[] row = Bytes.add(new byte[8], Bytes.toBytes(suffix));
231
232      int BLOCK_SIZE = (int)(recordsToWrite / 100);
233
234      for (long i = 0; i < recordsToWrite;) {
235        long blockStart = i;
236        for (long idxInBlock = 0;
237             idxInBlock < BLOCK_SIZE && i < recordsToWrite;
238             idxInBlock++, i++) {
239
240          long byteSwapped = swapLong(i);
241          Bytes.putLong(row, 0, byteSwapped);
242
243          Put p = new Put(row);
244          p.addColumn(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY);
245          if (blockStart > 0) {
246            for (int j = 0; j < numBackReferencesPerRow; j++) {
247              long referredRow = blockStart - BLOCK_SIZE + rand.nextInt(BLOCK_SIZE);
248              Bytes.putLong(row, 0, swapLong(referredRow));
249              p.addColumn(TEST_FAMILY, row, HConstants.EMPTY_BYTE_ARRAY);
250            }
251            refsWritten.increment(1);
252          }
253          rowsWritten.increment(1);
254          mutator.mutate(p);
255
256          if (i % 100 == 0) {
257            context.setStatus("Written " + i + "/" + recordsToWrite + " records");
258            context.progress();
259          }
260        }
261        // End of block, flush all of them before we start writing anything
262        // pointing to these!
263        mutator.flush();
264      }
265    }
266  }
267
268  public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
269    static final BytesWritable EMPTY = new BytesWritable(HConstants.EMPTY_BYTE_ARRAY);
270
271
272    @Override
273    protected void map(ImmutableBytesWritable key, Result value, Context context)
274        throws IOException, InterruptedException {
275      BytesWritable bwKey = new BytesWritable(key.get());
276      BytesWritable bwVal = new BytesWritable();
277      for (Cell kv : value.listCells()) {
278        if (Bytes.compareTo(TEST_QUALIFIER, 0, TEST_QUALIFIER.length,
279                            kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()) == 0) {
280          context.write(bwKey, EMPTY);
281        } else {
282          bwVal.set(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
283          context.write(bwVal, bwKey);
284        }
285      }
286    }
287  }
288
289  public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> {
290    private Counter refsChecked;
291    private Counter rowsWritten;
292
293    @Override
294    public void setup(Context context) throws IOException {
295      refsChecked = context.getCounter(Counters.REFERENCES_CHECKED);
296      rowsWritten = context.getCounter(Counters.ROWS_WRITTEN);
297    }
298
299    @Override
300    protected void reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers,
301        VerifyReducer.Context ctx) throws IOException, InterruptedException {
302      boolean gotOriginalRow = false;
303      int refCount = 0;
304
305      for (BytesWritable ref : referrers) {
306        if (ref.getLength() == 0) {
307          assert !gotOriginalRow;
308          gotOriginalRow = true;
309        } else {
310          refCount++;
311        }
312      }
313      refsChecked.increment(refCount);
314
315      if (!gotOriginalRow) {
316        String parsedRow = makeRowReadable(referredRow.getBytes(), referredRow.getLength());
317        String binRow = Bytes.toStringBinary(referredRow.getBytes(), 0, referredRow.getLength());
318        LOG.error("Reference error row " + parsedRow);
319        ctx.write(new Text(binRow), new Text(parsedRow));
320        rowsWritten.increment(1);
321      }
322    }
323
324    private String makeRowReadable(byte[] bytes, int length) {
325      long rowIdx = swapLong(Bytes.toLong(bytes, 0));
326      String suffix = Bytes.toString(bytes, 8, length - 8);
327
328      return "Row #" + rowIdx + " suffix " + suffix;
329    }
330  }
331
332  protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception {
333    Path outputDir = getTestDir(TEST_NAME, "load-output");
334    LOG.info("Load output dir: " + outputDir);
335
336    NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
337    conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
338
339    Job job = Job.getInstance(conf);
340    job.setJobName(TEST_NAME + " Load for " + htd.getTableName());
341    job.setJarByClass(this.getClass());
342    setMapperClass(job);
343    job.setInputFormatClass(NMapInputFormat.class);
344    job.setNumReduceTasks(0);
345    setJobScannerConf(job);
346    FileOutputFormat.setOutputPath(job, outputDir);
347
348    TableMapReduceUtil.addDependencyJars(job);
349
350    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
351    TableMapReduceUtil.initCredentials(job);
352    assertTrue(job.waitForCompletion(true));
353    return job;
354  }
355
356  protected void setMapperClass(Job job) {
357    job.setMapperClass(LoadMapper.class);
358  }
359
360  protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
361    Path outputDir = getTestDir(TEST_NAME, "verify-output");
362    LOG.info("Verify output dir: " + outputDir);
363
364    Job job = Job.getInstance(conf);
365    job.setJarByClass(this.getClass());
366    job.setJobName(TEST_NAME + " Verification for " + htd.getTableName());
367    setJobScannerConf(job);
368
369    Scan scan = new Scan();
370
371    TableMapReduceUtil.initTableMapperJob(
372        htd.getTableName().getNameAsString(), scan, VerifyMapper.class,
373        BytesWritable.class, BytesWritable.class, job);
374    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
375    int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
376    TableMapReduceUtil.setScannerCaching(job, scannerCaching);
377
378    job.setReducerClass(VerifyReducer.class);
379    job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
380    FileOutputFormat.setOutputPath(job, outputDir);
381    assertTrue(job.waitForCompletion(true));
382
383    long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
384    assertEquals(0, numOutputRecords);
385  }
386
387  /**
388   * Tool to search missing rows in WALs and hfiles.
389   * Pass in file or dir of keys to search for. Key file must have been written by Verify step
390   * (we depend on the format it writes out. We'll read them in and then search in hbase
391   * WALs and oldWALs dirs (Some of this is TODO).
392   */
393  public static class WALSearcher extends WALPlayer {
394    public WALSearcher(Configuration conf) {
395      super(conf);
396    }
397
398    /**
399     * The actual searcher mapper.
400     */
401    public static class WALMapperSearcher extends WALMapper {
402      private SortedSet<byte []> keysToFind;
403      private AtomicInteger rows = new AtomicInteger(0);
404
405      @Override
406      public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
407          throws IOException {
408        super.setup(context);
409        try {
410          this.keysToFind = readKeysToSearch(context.getConfiguration());
411          LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
412        } catch (InterruptedException e) {
413          throw new InterruptedIOException(e.toString());
414        }
415      }
416
417      @Override
418      protected boolean filter(Context context, Cell cell) {
419        // TODO: Can I do a better compare than this copying out key?
420        byte [] row = new byte [cell.getRowLength()];
421        System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
422        boolean b = this.keysToFind.contains(row);
423        if (b) {
424          String keyStr = Bytes.toStringBinary(row);
425          try {
426            LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
427          } catch (IOException|InterruptedException e) {
428            LOG.warn(e.toString(), e);
429          }
430          if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
431            context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
432          }
433          context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
434        }
435        return b;
436      }
437    }
438
439    // Put in place the above WALMapperSearcher.
440    @Override
441    public Job createSubmittableJob(String[] args) throws IOException {
442      Job job = super.createSubmittableJob(args);
443      // Call my class instead.
444      job.setJarByClass(WALMapperSearcher.class);
445      job.setMapperClass(WALMapperSearcher.class);
446      job.setOutputFormatClass(NullOutputFormat.class);
447      return job;
448    }
449  }
450
451  static final String FOUND_GROUP_KEY = "Found";
452  static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
453
454  static SortedSet<byte []> readKeysToSearch(final Configuration conf)
455      throws IOException, InterruptedException {
456    Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
457    FileSystem fs = FileSystem.get(conf);
458    SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR);
459    if (!fs.exists(keysInputDir)) {
460      throw new FileNotFoundException(keysInputDir.toString());
461    }
462    if (!fs.isDirectory(keysInputDir)) {
463      FileStatus keyFileStatus = fs.getFileStatus(keysInputDir);
464      readFileToSearch(conf, fs, keyFileStatus, result);
465    } else {
466      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
467      while(iterator.hasNext()) {
468        LocatedFileStatus keyFileStatus = iterator.next();
469        // Skip "_SUCCESS" file.
470        if (keyFileStatus.getPath().getName().startsWith("_")) continue;
471        readFileToSearch(conf, fs, keyFileStatus, result);
472      }
473    }
474    return result;
475  }
476
477  private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
478    final FileSystem fs, final FileStatus keyFileStatus, SortedSet<byte []> result)
479        throws IOException,
480    InterruptedException {
481    // verify uses file output format and writes <Text, Text>. We can read it as a text file
482    try (InputStream in = fs.open(keyFileStatus.getPath());
483        BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
484      // extract out the key and return that missing as a missing key
485      String line;
486      while ((line = reader.readLine()) != null) {
487        if (line.isEmpty()) continue;
488
489        String[] parts = line.split("\\s+");
490        if (parts.length >= 1) {
491          String key = parts[0];
492          result.add(Bytes.toBytesBinary(key));
493        } else {
494          LOG.info("Cannot parse key from: " + line);
495        }
496      }
497    }
498    return result;
499  }
500
501  private int doSearch(Configuration conf, String keysDir) throws Exception {
502    Path inputDir = new Path(keysDir);
503
504    getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
505    SortedSet<byte []> keys = readKeysToSearch(getConf());
506    if (keys.isEmpty()) throw new RuntimeException("No keys to find");
507    LOG.info("Count of keys to find: " + keys.size());
508    for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
509    // Now read all WALs. In two dirs. Presumes certain layout.
510    Path walsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME);
511    Path oldWalsDir = new Path(
512        CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME);
513    LOG.info("Running Search with keys inputDir=" + inputDir +
514      " against " + getConf().get(HConstants.HBASE_DIR));
515    int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
516    if (ret != 0) return ret;
517    return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
518  }
519
520  private static void setJobScannerConf(Job job) {
521    long lpr = job.getConfiguration().getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT) / 100;
522    job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, (int)lpr);
523  }
524
525  public Path getTestDir(String testName, String subdir) throws IOException {
526    Path testDir = util.getDataTestDirOnTestFS(testName);
527    FileSystem fs = FileSystem.get(getConf());
528    fs.deleteOnExit(testDir);
529
530    return new Path(new Path(testDir, testName), subdir);
531  }
532
533  @Test
534  public void testLoadAndVerify() throws Exception {
535    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TEST_NAME));
536    htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
537
538    Admin admin = getTestingUtil(getConf()).getAdmin();
539    admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), 40);
540
541    doLoad(getConf(), htd);
542    doVerify(getConf(), htd);
543
544    // Only disable and drop if we succeeded to verify - otherwise it's useful
545    // to leave it around for post-mortem
546    getTestingUtil(getConf()).deleteTable(htd.getTableName());
547  }
548
549  @Override
550  public void printUsage() {
551    printUsage(this.getClass().getSimpleName() + " <options>"
552        + " [-Doptions] <load|verify|loadAndVerify|search>", "Options", "");
553    System.err.println("");
554    System.err.println("  Loads a table with row dependencies and verifies the dependency chains");
555    System.err.println("Options");
556    System.err.println("  -Dloadmapper.table=<name>        Table to write/verify (default autogen)");
557    System.err.println("  -Dloadmapper.backrefs=<n>        Number of backreferences per row (default 50)");
558    System.err.println("  -Dloadmapper.num_to_write=<n>    Number of rows per mapper (default 100,000 per mapper)");
559    System.err.println("  -Dloadmapper.deleteAfter=<bool>  Delete after a successful verify (default true)");
560    System.err.println("  -Dloadmapper.numPresplits=<n>    Number of presplit regions to start with (default 40)");
561    System.err.println("  -Dloadmapper.map.tasks=<n>       Number of map tasks for load (default 200)");
562    System.err.println("  -Dverify.reduce.tasks=<n>        Number of reduce tasks for verify (default 35)");
563    System.err.println("  -Dverify.scannercaching=<n>      Number hbase scanner caching rows to read (default 50)");
564  }
565
566
567  @Override
568  protected void processOptions(CommandLine cmd) {
569    super.processOptions(cmd);
570
571    String[] args = cmd.getArgs();
572    if (args == null || args.length < 1) {
573      printUsage();
574      throw new RuntimeException("Incorrect Number of args.");
575    }
576    toRun = args[0];
577    if (toRun.equalsIgnoreCase("search")) {
578      if (args.length > 1) {
579        keysDir = args[1];
580      }
581    }
582  }
583
584  @Override
585  public int runTestFromCommandLine() throws Exception {
586    IntegrationTestingUtility.setUseDistributedCluster(getConf());
587    boolean doLoad = false;
588    boolean doVerify = false;
589    boolean doSearch = false;
590    boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true);
591    int numPresplits = getConf().getInt("loadmapper.numPresplits", 40);
592
593    if (toRun.equalsIgnoreCase("load")) {
594      doLoad = true;
595    } else if (toRun.equalsIgnoreCase("verify")) {
596      doVerify= true;
597    } else if (toRun.equalsIgnoreCase("loadAndVerify")) {
598      doLoad=true;
599      doVerify= true;
600    } else if (toRun.equalsIgnoreCase("search")) {
601      doLoad=false;
602      doVerify= false;
603      doSearch = true;
604      if (keysDir == null) {
605        System.err.println("Usage: search <KEYS_DIR>]");
606        return 1;
607      }
608    } else {
609      System.err.println("Invalid argument " + toRun);
610      printUsage();
611      return 1;
612    }
613
614    // create HTableDescriptor for specified table
615    TableName table = getTablename();
616    HTableDescriptor htd = new HTableDescriptor(table);
617    htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
618
619    if (doLoad) {
620      try (Connection conn = ConnectionFactory.createConnection(getConf());
621          Admin admin = conn.getAdmin()) {
622        admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
623        doLoad(getConf(), htd);
624      }
625    }
626    if (doVerify) {
627      doVerify(getConf(), htd);
628      if (doDelete) {
629        getTestingUtil(getConf()).deleteTable(htd.getTableName());
630      }
631    }
632    if (doSearch) {
633      return doSearch(getConf(), keysDir);
634    }
635    return 0;
636  }
637
638  @Override
639  public TableName getTablename() {
640    return TableName.valueOf(getConf().get(TABLE_NAME_KEY, TEST_NAME));
641  }
642
643  @Override
644  protected Set<String> getColumnFamilies() {
645    return Sets.newHashSet(Bytes.toString(TEST_FAMILY));
646  }
647
648  public static void main(String argv[]) throws Exception {
649    Configuration conf = HBaseConfiguration.create();
650    IntegrationTestingUtility.setUseDistributedCluster(conf);
651    int ret = ToolRunner.run(conf, new IntegrationTestLoadAndVerify(), argv);
652    System.exit(ret);
653  }
654}