001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.BufferedReader; 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.InputStreamReader; 028import java.io.InterruptedIOException; 029import java.nio.charset.StandardCharsets; 030import java.util.List; 031import java.util.Random; 032import java.util.Set; 033import java.util.SortedSet; 034import java.util.TreeSet; 035import java.util.concurrent.ThreadLocalRandom; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.regex.Matcher; 038import java.util.regex.Pattern; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.LocatedFileStatus; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.fs.RemoteIterator; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.HBaseConfiguration; 047import org.apache.hadoop.hbase.HColumnDescriptor; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.HTableDescriptor; 050import org.apache.hadoop.hbase.IntegrationTestBase; 051import org.apache.hadoop.hbase.IntegrationTestingUtility; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.client.BufferedMutator; 055import org.apache.hadoop.hbase.client.BufferedMutatorParams; 056import org.apache.hadoop.hbase.client.Connection; 057import org.apache.hadoop.hbase.client.ConnectionFactory; 058import org.apache.hadoop.hbase.client.Mutation; 059import org.apache.hadoop.hbase.client.Put; 060import org.apache.hadoop.hbase.client.Result; 061import org.apache.hadoop.hbase.client.Scan; 062import org.apache.hadoop.hbase.client.ScannerCallable; 063import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 064import org.apache.hadoop.hbase.mapreduce.NMapInputFormat; 065import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 066import org.apache.hadoop.hbase.mapreduce.TableMapper; 067import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; 068import org.apache.hadoop.hbase.mapreduce.WALPlayer; 069import org.apache.hadoop.hbase.testclassification.IntegrationTests; 070import org.apache.hadoop.hbase.util.AbstractHBaseTool; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.CommonFSUtils; 073import org.apache.hadoop.hbase.wal.WALEdit; 074import org.apache.hadoop.hbase.wal.WALKey; 075import org.apache.hadoop.io.BytesWritable; 076import org.apache.hadoop.io.NullWritable; 077import org.apache.hadoop.io.Text; 078import org.apache.hadoop.mapreduce.Counter; 079import org.apache.hadoop.mapreduce.Job; 080import org.apache.hadoop.mapreduce.Mapper; 081import org.apache.hadoop.mapreduce.Reducer; 082import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 083import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 084import org.apache.hadoop.util.ToolRunner; 085import org.junit.Test; 086import org.junit.experimental.categories.Category; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 091import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 092import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 093import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 094 095/** 096 * A large test which loads a lot of data that has internal references, and verifies the data. In 097 * load step, 200 map tasks are launched, which in turn write loadmapper.num_to_write (default 100K) 098 * rows to an hbase table. Rows are written in blocks, for a total of 100 blocks. Each row in a 099 * block, contains loadmapper.backrefs (default 50) references to random rows in the prev block. 100 * Verify step is scans the table, and verifies that for every referenced row, the row is actually 101 * there (no data loss). Failed rows are output from reduce to be saved in the job output dir in 102 * hdfs and inspected later. This class can be run as a unit test, as an integration test, or from 103 * the command line Originally taken from Apache Bigtop. 104 */ 105@Category(IntegrationTests.class) 106public class IntegrationTestLoadAndVerify extends IntegrationTestBase { 107 108 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadAndVerify.class); 109 110 private static final String TEST_NAME = "IntegrationTestLoadAndVerify"; 111 private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); 112 private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1"); 113 114 private static final String NUM_TO_WRITE_KEY = "loadmapper.num_to_write"; 115 private static final long NUM_TO_WRITE_DEFAULT = 100 * 1000; 116 117 private static final String TABLE_NAME_KEY = "loadmapper.table"; 118 private static final String TABLE_NAME_DEFAULT = "table"; 119 120 private static final String NUM_BACKREFS_KEY = "loadmapper.backrefs"; 121 private static final int NUM_BACKREFS_DEFAULT = 50; 122 123 private static final String NUM_MAP_TASKS_KEY = "loadmapper.map.tasks"; 124 private static final String NUM_REDUCE_TASKS_KEY = "verify.reduce.tasks"; 125 private static final int NUM_MAP_TASKS_DEFAULT = 200; 126 private static final int NUM_REDUCE_TASKS_DEFAULT = 35; 127 128 private static final int SCANNER_CACHING = 500; 129 130 private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters 131 132 private String toRun = null; 133 private String keysDir = null; 134 135 private enum Counters { 136 ROWS_WRITTEN, 137 REFERENCES_WRITTEN, 138 REFERENCES_CHECKED 139 } 140 141 @Override 142 public void setUpCluster() throws Exception { 143 util = getTestingUtil(getConf()); 144 util.initializeCluster(3); 145 this.setConf(util.getConfiguration()); 146 if (!util.isDistributedCluster()) { 147 getConf().setLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT / 100); 148 getConf().setInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT / 100); 149 getConf().setInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT / 10); 150 util.startMiniMapReduceCluster(); 151 } 152 } 153 154 @Override 155 public void cleanUpCluster() throws Exception { 156 super.cleanUpCluster(); 157 if (!util.isDistributedCluster()) { 158 util.shutdownMiniMapReduceCluster(); 159 } 160 } 161 162 /** 163 * Converts a "long" value between endian systems. Borrowed from Apache Commons IO 164 * @param value value to convert 165 * @return the converted value 166 */ 167 public static long swapLong(long value) { 168 return (((value >> 0) & 0xff) << 56) + (((value >> 8) & 0xff) << 48) 169 + (((value >> 16) & 0xff) << 40) + (((value >> 24) & 0xff) << 32) 170 + (((value >> 32) & 0xff) << 24) + (((value >> 40) & 0xff) << 16) 171 + (((value >> 48) & 0xff) << 8) + (((value >> 56) & 0xff) << 0); 172 } 173 174 public static class LoadMapper 175 extends Mapper<NullWritable, NullWritable, NullWritable, NullWritable> { 176 protected long recordsToWrite; 177 protected Connection connection; 178 protected BufferedMutator mutator; 179 protected Configuration conf; 180 protected int numBackReferencesPerRow; 181 protected String shortTaskId; 182 protected Counter rowsWritten, refsWritten; 183 184 @Override 185 public void setup(Context context) throws IOException { 186 conf = context.getConfiguration(); 187 recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT); 188 String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT); 189 numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT); 190 this.connection = ConnectionFactory.createConnection(conf); 191 mutator = connection.getBufferedMutator( 192 new BufferedMutatorParams(TableName.valueOf(tableName)).writeBufferSize(4 * 1024 * 1024)); 193 194 String taskId = conf.get("mapreduce.task.attempt.id"); 195 Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId); 196 if (!matcher.matches()) { 197 throw new RuntimeException("Strange task ID: " + taskId); 198 } 199 shortTaskId = matcher.group(1); 200 201 rowsWritten = context.getCounter(Counters.ROWS_WRITTEN); 202 refsWritten = context.getCounter(Counters.REFERENCES_WRITTEN); 203 } 204 205 @Override 206 public void cleanup(Context context) throws IOException { 207 mutator.close(); 208 connection.close(); 209 } 210 211 @Override 212 protected void map(NullWritable key, NullWritable value, Context context) 213 throws IOException, InterruptedException { 214 215 String suffix = "/" + shortTaskId; 216 byte[] row = Bytes.add(new byte[8], Bytes.toBytes(suffix)); 217 int BLOCK_SIZE = (int) (recordsToWrite / 100); 218 Random rand = ThreadLocalRandom.current(); 219 220 for (long i = 0; i < recordsToWrite;) { 221 long blockStart = i; 222 for (long idxInBlock = 0; idxInBlock < BLOCK_SIZE 223 && i < recordsToWrite; idxInBlock++, i++) { 224 225 long byteSwapped = swapLong(i); 226 Bytes.putLong(row, 0, byteSwapped); 227 228 Put p = new Put(row); 229 p.addColumn(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY); 230 if (blockStart > 0) { 231 for (int j = 0; j < numBackReferencesPerRow; j++) { 232 long referredRow = blockStart - BLOCK_SIZE + rand.nextInt(BLOCK_SIZE); 233 Bytes.putLong(row, 0, swapLong(referredRow)); 234 p.addColumn(TEST_FAMILY, row, HConstants.EMPTY_BYTE_ARRAY); 235 } 236 refsWritten.increment(1); 237 } 238 rowsWritten.increment(1); 239 mutator.mutate(p); 240 241 if (i % 100 == 0) { 242 context.setStatus("Written " + i + "/" + recordsToWrite + " records"); 243 context.progress(); 244 } 245 } 246 // End of block, flush all of them before we start writing anything 247 // pointing to these! 248 mutator.flush(); 249 } 250 } 251 } 252 253 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> { 254 static final BytesWritable EMPTY = new BytesWritable(HConstants.EMPTY_BYTE_ARRAY); 255 256 @Override 257 protected void map(ImmutableBytesWritable key, Result value, Context context) 258 throws IOException, InterruptedException { 259 BytesWritable bwKey = new BytesWritable(key.get()); 260 BytesWritable bwVal = new BytesWritable(); 261 for (Cell kv : value.listCells()) { 262 if ( 263 Bytes.compareTo(TEST_QUALIFIER, 0, TEST_QUALIFIER.length, kv.getQualifierArray(), 264 kv.getQualifierOffset(), kv.getQualifierLength()) == 0 265 ) { 266 context.write(bwKey, EMPTY); 267 } else { 268 bwVal.set(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()); 269 context.write(bwVal, bwKey); 270 } 271 } 272 } 273 } 274 275 public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> { 276 private Counter refsChecked; 277 private Counter rowsWritten; 278 279 @Override 280 public void setup(Context context) throws IOException { 281 refsChecked = context.getCounter(Counters.REFERENCES_CHECKED); 282 rowsWritten = context.getCounter(Counters.ROWS_WRITTEN); 283 } 284 285 @Override 286 protected void reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers, 287 VerifyReducer.Context ctx) throws IOException, InterruptedException { 288 boolean gotOriginalRow = false; 289 int refCount = 0; 290 291 for (BytesWritable ref : referrers) { 292 if (ref.getLength() == 0) { 293 assert !gotOriginalRow; 294 gotOriginalRow = true; 295 } else { 296 refCount++; 297 } 298 } 299 refsChecked.increment(refCount); 300 301 if (!gotOriginalRow) { 302 String parsedRow = makeRowReadable(referredRow.getBytes(), referredRow.getLength()); 303 String binRow = Bytes.toStringBinary(referredRow.getBytes(), 0, referredRow.getLength()); 304 LOG.error("Reference error row " + parsedRow); 305 ctx.write(new Text(binRow), new Text(parsedRow)); 306 rowsWritten.increment(1); 307 } 308 } 309 310 private String makeRowReadable(byte[] bytes, int length) { 311 long rowIdx = swapLong(Bytes.toLong(bytes, 0)); 312 String suffix = Bytes.toString(bytes, 8, length - 8); 313 314 return "Row #" + rowIdx + " suffix " + suffix; 315 } 316 } 317 318 protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception { 319 Path outputDir = getTestDir(TEST_NAME, "load-output"); 320 LOG.info("Load output dir: " + outputDir); 321 322 NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT)); 323 conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString()); 324 325 Job job = Job.getInstance(conf); 326 job.setJobName(TEST_NAME + " Load for " + htd.getTableName()); 327 job.setJarByClass(this.getClass()); 328 setMapperClass(job); 329 job.setInputFormatClass(NMapInputFormat.class); 330 job.setNumReduceTasks(0); 331 setJobScannerConf(job); 332 FileOutputFormat.setOutputPath(job, outputDir); 333 334 TableMapReduceUtil.addDependencyJars(job); 335 336 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class); 337 TableMapReduceUtil.initCredentials(job); 338 assertTrue(job.waitForCompletion(true)); 339 return job; 340 } 341 342 protected void setMapperClass(Job job) { 343 job.setMapperClass(LoadMapper.class); 344 } 345 346 protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception { 347 Path outputDir = getTestDir(TEST_NAME, "verify-output"); 348 LOG.info("Verify output dir: " + outputDir); 349 350 Job job = Job.getInstance(conf); 351 job.setJarByClass(this.getClass()); 352 job.setJobName(TEST_NAME + " Verification for " + htd.getTableName()); 353 setJobScannerConf(job); 354 355 Scan scan = new Scan(); 356 357 TableMapReduceUtil.initTableMapperJob(htd.getTableName().getNameAsString(), scan, 358 VerifyMapper.class, BytesWritable.class, BytesWritable.class, job); 359 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class); 360 int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING); 361 TableMapReduceUtil.setScannerCaching(job, scannerCaching); 362 363 job.setReducerClass(VerifyReducer.class); 364 job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT)); 365 FileOutputFormat.setOutputPath(job, outputDir); 366 assertTrue(job.waitForCompletion(true)); 367 368 long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue(); 369 assertEquals(0, numOutputRecords); 370 } 371 372 /** 373 * Tool to search missing rows in WALs and hfiles. Pass in file or dir of keys to search for. Key 374 * file must have been written by Verify step (we depend on the format it writes out. We'll read 375 * them in and then search in hbase WALs and oldWALs dirs (Some of this is TODO). 376 */ 377 public static class WALSearcher extends WALPlayer { 378 public WALSearcher(Configuration conf) { 379 super(conf); 380 } 381 382 /** 383 * The actual searcher mapper. 384 */ 385 public static class WALMapperSearcher extends WALMapper { 386 private SortedSet<byte[]> keysToFind; 387 private AtomicInteger rows = new AtomicInteger(0); 388 389 @Override 390 public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) 391 throws IOException { 392 super.setup(context); 393 try { 394 this.keysToFind = readKeysToSearch(context.getConfiguration()); 395 LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); 396 } catch (InterruptedException e) { 397 throw new InterruptedIOException(e.toString()); 398 } 399 } 400 401 @Override 402 protected boolean filter(Context context, Cell cell) { 403 // TODO: Can I do a better compare than this copying out key? 404 byte[] row = new byte[cell.getRowLength()]; 405 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); 406 boolean b = this.keysToFind.contains(row); 407 if (b) { 408 String keyStr = Bytes.toStringBinary(row); 409 try { 410 LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); 411 } catch (IOException | InterruptedException e) { 412 LOG.warn(e.toString(), e); 413 } 414 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 415 context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); 416 } 417 context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); 418 } 419 return b; 420 } 421 } 422 423 // Put in place the above WALMapperSearcher. 424 @Override 425 public Job createSubmittableJob(String[] args) throws IOException { 426 Job job = super.createSubmittableJob(args); 427 // Call my class instead. 428 job.setJarByClass(WALMapperSearcher.class); 429 job.setMapperClass(WALMapperSearcher.class); 430 job.setOutputFormatClass(NullOutputFormat.class); 431 return job; 432 } 433 } 434 435 static final String FOUND_GROUP_KEY = "Found"; 436 static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; 437 438 static SortedSet<byte[]> readKeysToSearch(final Configuration conf) 439 throws IOException, InterruptedException { 440 Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); 441 FileSystem fs = FileSystem.get(conf); 442 SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 443 if (!fs.exists(keysInputDir)) { 444 throw new FileNotFoundException(keysInputDir.toString()); 445 } 446 if (!fs.isDirectory(keysInputDir)) { 447 FileStatus keyFileStatus = fs.getFileStatus(keysInputDir); 448 readFileToSearch(fs, keyFileStatus, result); 449 } else { 450 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false); 451 while (iterator.hasNext()) { 452 LocatedFileStatus keyFileStatus = iterator.next(); 453 // Skip "_SUCCESS" file. 454 if (keyFileStatus.getPath().getName().startsWith("_")) continue; 455 readFileToSearch(fs, keyFileStatus, result); 456 } 457 } 458 return result; 459 } 460 461 private static SortedSet<byte[]> readFileToSearch(final FileSystem fs, 462 final FileStatus keyFileStatus, SortedSet<byte[]> result) 463 throws IOException, InterruptedException { 464 // verify uses file output format and writes <Text, Text>. We can read it as a text file 465 try (InputStream in = fs.open(keyFileStatus.getPath()); BufferedReader reader = 466 new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) { 467 // extract out the key and return that missing as a missing key 468 String line; 469 while ((line = reader.readLine()) != null) { 470 if (line.isEmpty()) continue; 471 List<String> parts = Splitter.onPattern("\\s+").splitToList(line); 472 if (parts.size() >= 1) { 473 result.add(Bytes.toBytesBinary(Iterables.get(parts, 0))); 474 } else { 475 LOG.info("Cannot parse key from: " + line); 476 } 477 } 478 } 479 return result; 480 } 481 482 private int doSearch(String keysDir) throws Exception { 483 Path inputDir = new Path(keysDir); 484 485 getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); 486 SortedSet<byte[]> keys = readKeysToSearch(getConf()); 487 if (keys.isEmpty()) throw new RuntimeException("No keys to find"); 488 LOG.info("Count of keys to find: " + keys.size()); 489 for (byte[] key : keys) 490 LOG.info("Key: " + Bytes.toStringBinary(key)); 491 // Now read all WALs. In two dirs. Presumes certain layout. 492 Path walsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME); 493 Path oldWalsDir = 494 new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME); 495 LOG.info("Running Search with keys inputDir=" + inputDir + " against " 496 + getConf().get(HConstants.HBASE_DIR)); 497 int ret = ToolRunner.run(new WALSearcher(getConf()), new String[] { walsDir.toString(), "" }); 498 if (ret != 0) return ret; 499 return ToolRunner.run(new WALSearcher(getConf()), new String[] { oldWalsDir.toString(), "" }); 500 } 501 502 private static void setJobScannerConf(Job job) { 503 // Make sure scanners log something useful to make debugging possible. 504 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true); 505 long lpr = job.getConfiguration().getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT) / 100; 506 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, (int) lpr); 507 } 508 509 public Path getTestDir(String testName, String subdir) throws IOException { 510 Path testDir = util.getDataTestDirOnTestFS(testName); 511 FileSystem fs = FileSystem.get(getConf()); 512 fs.deleteOnExit(testDir); 513 514 return new Path(new Path(testDir, testName), subdir); 515 } 516 517 @Test 518 public void testLoadAndVerify() throws Exception { 519 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TEST_NAME)); 520 htd.addFamily(new HColumnDescriptor(TEST_FAMILY)); 521 522 Admin admin = getTestingUtil(getConf()).getAdmin(); 523 admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), 40); 524 525 doLoad(getConf(), htd); 526 doVerify(getConf(), htd); 527 528 // Only disable and drop if we succeeded to verify - otherwise it's useful 529 // to leave it around for post-mortem 530 getTestingUtil(getConf()).deleteTable(htd.getTableName()); 531 } 532 533 @Override 534 public void printUsage() { 535 printUsage(this.getClass().getSimpleName() + " <options>" 536 + " [-Doptions] <load|verify|loadAndVerify|search>", "Options", ""); 537 System.err.println(""); 538 System.err.println(" Loads a table with row dependencies and verifies the dependency chains"); 539 System.err.println("Options"); 540 System.err 541 .println(" -Dloadmapper.table=<name> Table to write/verify (default autogen)"); 542 System.err 543 .println(" -Dloadmapper.backrefs=<n> Number of backreferences per row (default 50)"); 544 System.err.println( 545 " -Dloadmapper.num_to_write=<n> Number of rows per mapper (default 100,000 per mapper)"); 546 System.err.println( 547 " -Dloadmapper.deleteAfter=<bool> Delete after a successful verify (default true)"); 548 System.err.println( 549 " -Dloadmapper.numPresplits=<n> Number of presplit regions to start with (default 40)"); 550 System.err 551 .println(" -Dloadmapper.map.tasks=<n> Number of map tasks for load (default 200)"); 552 System.err 553 .println(" -Dverify.reduce.tasks=<n> Number of reduce tasks for verify (default 35)"); 554 System.err.println( 555 " -Dverify.scannercaching=<n> Number hbase scanner caching rows to read (default 50)"); 556 } 557 558 @Override 559 protected void processOptions(CommandLine cmd) { 560 super.processOptions(cmd); 561 562 String[] args = cmd.getArgs(); 563 if (args == null || args.length < 1) { 564 printUsage(); 565 throw new RuntimeException("Incorrect Number of args."); 566 } 567 toRun = args[0]; 568 if (toRun.equalsIgnoreCase("search")) { 569 if (args.length > 1) { 570 keysDir = args[1]; 571 } 572 } 573 } 574 575 @Override 576 public int runTestFromCommandLine() throws Exception { 577 IntegrationTestingUtility.setUseDistributedCluster(getConf()); 578 boolean doLoad = false; 579 boolean doVerify = false; 580 boolean doSearch = false; 581 boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter", true); 582 int numPresplits = getConf().getInt("loadmapper.numPresplits", 40); 583 584 if (toRun.equalsIgnoreCase("load")) { 585 doLoad = true; 586 } else if (toRun.equalsIgnoreCase("verify")) { 587 doVerify = true; 588 } else if (toRun.equalsIgnoreCase("loadAndVerify")) { 589 doLoad = true; 590 doVerify = true; 591 } else if (toRun.equalsIgnoreCase("search")) { 592 doLoad = false; 593 doVerify = false; 594 doSearch = true; 595 if (keysDir == null) { 596 System.err.println("Usage: search <KEYS_DIR>]"); 597 return 1; 598 } 599 } else { 600 System.err.println("Invalid argument " + toRun); 601 printUsage(); 602 return 1; 603 } 604 605 // create HTableDescriptor for specified table 606 TableName table = getTablename(); 607 HTableDescriptor htd = new HTableDescriptor(table); 608 htd.addFamily(new HColumnDescriptor(TEST_FAMILY)); 609 610 if (doLoad) { 611 try (Connection conn = ConnectionFactory.createConnection(getConf()); 612 Admin admin = conn.getAdmin()) { 613 admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits); 614 doLoad(getConf(), htd); 615 } 616 } 617 if (doVerify) { 618 doVerify(getConf(), htd); 619 if (doDelete) { 620 getTestingUtil(getConf()).deleteTable(htd.getTableName()); 621 } 622 } 623 if (doSearch) { 624 return doSearch(keysDir); 625 } 626 return 0; 627 } 628 629 @Override 630 public TableName getTablename() { 631 return TableName.valueOf(getConf().get(TABLE_NAME_KEY, TEST_NAME)); 632 } 633 634 @Override 635 protected Set<String> getColumnFamilies() { 636 return Sets.newHashSet(Bytes.toString(TEST_FAMILY)); 637 } 638 639 public static void main(String argv[]) throws Exception { 640 Configuration conf = HBaseConfiguration.create(); 641 IntegrationTestingUtility.setUseDistributedCluster(conf); 642 int ret = ToolRunner.run(conf, new IntegrationTestLoadAndVerify(), argv); 643 System.exit(ret); 644 } 645}