001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * <p/> 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * <p/> 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Random; 025import java.util.Set; 026import java.util.regex.Matcher; 027import java.util.regex.Pattern; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.LocatedFileStatus; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.RemoteIterator; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HColumnDescriptor; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HTableDescriptor; 040import org.apache.hadoop.hbase.IntegrationTestBase; 041import org.apache.hadoop.hbase.IntegrationTestingUtility; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.ConnectionFactory; 044import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 045import org.apache.hadoop.hbase.testclassification.IntegrationTests; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.BufferedMutator; 049import org.apache.hadoop.hbase.client.BufferedMutatorParams; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.client.Result; 052import org.apache.hadoop.hbase.client.Scan; 053import org.apache.hadoop.hbase.client.ScannerCallable; 054import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 055import org.apache.hadoop.hbase.mapreduce.NMapInputFormat; 056import org.apache.hadoop.hbase.mapreduce.TableMapper; 057import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; 058import org.apache.hadoop.hbase.util.AbstractHBaseTool; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.CommonFSUtils; 061import org.apache.hadoop.io.BytesWritable; 062import org.apache.hadoop.io.NullWritable; 063import org.apache.hadoop.io.Text; 064import org.apache.hadoop.mapreduce.Counter; 065import org.apache.hadoop.mapreduce.Job; 066import org.apache.hadoop.mapreduce.Mapper; 067import org.apache.hadoop.mapreduce.Reducer; 068import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 069import org.apache.hadoop.util.ToolRunner; 070import org.apache.hadoop.hbase.wal.WALEdit; 071import org.apache.hadoop.hbase.wal.WALKey; 072import org.apache.hadoop.hbase.mapreduce.WALPlayer; 073import org.apache.hadoop.hbase.client.Mutation; 074import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 075import java.io.BufferedReader; 076import java.io.FileNotFoundException; 077import java.io.InputStream; 078import java.io.InputStreamReader; 079import java.io.InterruptedIOException; 080import java.util.SortedSet; 081import java.util.TreeSet; 082import java.util.concurrent.atomic.AtomicInteger; 083import org.junit.Test; 084import org.junit.experimental.categories.Category; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 088import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 089 090/** 091 * A large test which loads a lot of data that has internal references, and 092 * verifies the data. 093 * 094 * In load step, 200 map tasks are launched, which in turn write loadmapper.num_to_write 095 * (default 100K) rows to an hbase table. Rows are written in blocks, for a total of 096 * 100 blocks. Each row in a block, contains loadmapper.backrefs (default 50) references 097 * to random rows in the prev block. 098 * 099 * Verify step is scans the table, and verifies that for every referenced row, the row is 100 * actually there (no data loss). Failed rows are output from reduce to be saved in the 101 * job output dir in hdfs and inspected later. 102 * 103 * This class can be run as a unit test, as an integration test, or from the command line 104 * 105 * Originally taken from Apache Bigtop. 106 */ 107@Category(IntegrationTests.class) 108public class IntegrationTestLoadAndVerify extends IntegrationTestBase { 109 110 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadAndVerify.class); 111 112 private static final String TEST_NAME = "IntegrationTestLoadAndVerify"; 113 private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); 114 private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1"); 115 116 private static final String NUM_TO_WRITE_KEY = 117 "loadmapper.num_to_write"; 118 private static final long NUM_TO_WRITE_DEFAULT = 100*1000; 119 120 private static final String TABLE_NAME_KEY = "loadmapper.table"; 121 private static final String TABLE_NAME_DEFAULT = "table"; 122 123 private static final String NUM_BACKREFS_KEY = "loadmapper.backrefs"; 124 private static final int NUM_BACKREFS_DEFAULT = 50; 125 126 private static final String NUM_MAP_TASKS_KEY = "loadmapper.map.tasks"; 127 private static final String NUM_REDUCE_TASKS_KEY = "verify.reduce.tasks"; 128 private static final int NUM_MAP_TASKS_DEFAULT = 200; 129 private static final int NUM_REDUCE_TASKS_DEFAULT = 35; 130 131 private static final int SCANNER_CACHING = 500; 132 133 private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters 134 135 private String toRun = null; 136 private String keysDir = null; 137 138 private enum Counters { 139 ROWS_WRITTEN, 140 REFERENCES_WRITTEN, 141 REFERENCES_CHECKED 142 } 143 144 @Override 145 public void setUpCluster() throws Exception { 146 util = getTestingUtil(getConf()); 147 util.initializeCluster(3); 148 this.setConf(util.getConfiguration()); 149 if (!util.isDistributedCluster()) { 150 getConf().setLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT / 100); 151 getConf().setInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT / 100); 152 getConf().setInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT / 10); 153 util.startMiniMapReduceCluster(); 154 } 155 } 156 157@Override 158public void cleanUpCluster() throws Exception { 159 super.cleanUpCluster(); 160 if (!util.isDistributedCluster()) { 161 util.shutdownMiniMapReduceCluster(); 162 } 163} 164 165 /** 166 * Converts a "long" value between endian systems. 167 * Borrowed from Apache Commons IO 168 * @param value value to convert 169 * @return the converted value 170 */ 171 public static long swapLong(long value) 172 { 173 return 174 ( ( ( value >> 0 ) & 0xff ) << 56 ) + 175 ( ( ( value >> 8 ) & 0xff ) << 48 ) + 176 ( ( ( value >> 16 ) & 0xff ) << 40 ) + 177 ( ( ( value >> 24 ) & 0xff ) << 32 ) + 178 ( ( ( value >> 32 ) & 0xff ) << 24 ) + 179 ( ( ( value >> 40 ) & 0xff ) << 16 ) + 180 ( ( ( value >> 48 ) & 0xff ) << 8 ) + 181 ( ( ( value >> 56 ) & 0xff ) << 0 ); 182 } 183 184 public static class LoadMapper 185 extends Mapper<NullWritable, NullWritable, NullWritable, NullWritable> 186 { 187 protected long recordsToWrite; 188 protected Connection connection; 189 protected BufferedMutator mutator; 190 protected Configuration conf; 191 protected int numBackReferencesPerRow; 192 193 protected String shortTaskId; 194 195 protected Random rand = new Random(); 196 protected Counter rowsWritten, refsWritten; 197 198 @Override 199 public void setup(Context context) throws IOException { 200 conf = context.getConfiguration(); 201 recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT); 202 String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT); 203 numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT); 204 this.connection = ConnectionFactory.createConnection(conf); 205 mutator = connection.getBufferedMutator( 206 new BufferedMutatorParams(TableName.valueOf(tableName)) 207 .writeBufferSize(4 * 1024 * 1024)); 208 209 String taskId = conf.get("mapreduce.task.attempt.id"); 210 Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId); 211 if (!matcher.matches()) { 212 throw new RuntimeException("Strange task ID: " + taskId); 213 } 214 shortTaskId = matcher.group(1); 215 216 rowsWritten = context.getCounter(Counters.ROWS_WRITTEN); 217 refsWritten = context.getCounter(Counters.REFERENCES_WRITTEN); 218 } 219 220 @Override 221 public void cleanup(Context context) throws IOException { 222 mutator.close(); 223 connection.close(); 224 } 225 226 @Override 227 protected void map(NullWritable key, NullWritable value, 228 Context context) throws IOException, InterruptedException { 229 230 String suffix = "/" + shortTaskId; 231 byte[] row = Bytes.add(new byte[8], Bytes.toBytes(suffix)); 232 233 int BLOCK_SIZE = (int)(recordsToWrite / 100); 234 235 for (long i = 0; i < recordsToWrite;) { 236 long blockStart = i; 237 for (long idxInBlock = 0; 238 idxInBlock < BLOCK_SIZE && i < recordsToWrite; 239 idxInBlock++, i++) { 240 241 long byteSwapped = swapLong(i); 242 Bytes.putLong(row, 0, byteSwapped); 243 244 Put p = new Put(row); 245 p.addColumn(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY); 246 if (blockStart > 0) { 247 for (int j = 0; j < numBackReferencesPerRow; j++) { 248 long referredRow = blockStart - BLOCK_SIZE + rand.nextInt(BLOCK_SIZE); 249 Bytes.putLong(row, 0, swapLong(referredRow)); 250 p.addColumn(TEST_FAMILY, row, HConstants.EMPTY_BYTE_ARRAY); 251 } 252 refsWritten.increment(1); 253 } 254 rowsWritten.increment(1); 255 mutator.mutate(p); 256 257 if (i % 100 == 0) { 258 context.setStatus("Written " + i + "/" + recordsToWrite + " records"); 259 context.progress(); 260 } 261 } 262 // End of block, flush all of them before we start writing anything 263 // pointing to these! 264 mutator.flush(); 265 } 266 } 267 } 268 269 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> { 270 static final BytesWritable EMPTY = new BytesWritable(HConstants.EMPTY_BYTE_ARRAY); 271 272 273 @Override 274 protected void map(ImmutableBytesWritable key, Result value, Context context) 275 throws IOException, InterruptedException { 276 BytesWritable bwKey = new BytesWritable(key.get()); 277 BytesWritable bwVal = new BytesWritable(); 278 for (Cell kv : value.listCells()) { 279 if (Bytes.compareTo(TEST_QUALIFIER, 0, TEST_QUALIFIER.length, 280 kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()) == 0) { 281 context.write(bwKey, EMPTY); 282 } else { 283 bwVal.set(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()); 284 context.write(bwVal, bwKey); 285 } 286 } 287 } 288 } 289 290 public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> { 291 private Counter refsChecked; 292 private Counter rowsWritten; 293 294 @Override 295 public void setup(Context context) throws IOException { 296 refsChecked = context.getCounter(Counters.REFERENCES_CHECKED); 297 rowsWritten = context.getCounter(Counters.ROWS_WRITTEN); 298 } 299 300 @Override 301 protected void reduce(BytesWritable referredRow, Iterable<BytesWritable> referrers, 302 VerifyReducer.Context ctx) throws IOException, InterruptedException { 303 boolean gotOriginalRow = false; 304 int refCount = 0; 305 306 for (BytesWritable ref : referrers) { 307 if (ref.getLength() == 0) { 308 assert !gotOriginalRow; 309 gotOriginalRow = true; 310 } else { 311 refCount++; 312 } 313 } 314 refsChecked.increment(refCount); 315 316 if (!gotOriginalRow) { 317 String parsedRow = makeRowReadable(referredRow.getBytes(), referredRow.getLength()); 318 String binRow = Bytes.toStringBinary(referredRow.getBytes(), 0, referredRow.getLength()); 319 LOG.error("Reference error row " + parsedRow); 320 ctx.write(new Text(binRow), new Text(parsedRow)); 321 rowsWritten.increment(1); 322 } 323 } 324 325 private String makeRowReadable(byte[] bytes, int length) { 326 long rowIdx = swapLong(Bytes.toLong(bytes, 0)); 327 String suffix = Bytes.toString(bytes, 8, length - 8); 328 329 return "Row #" + rowIdx + " suffix " + suffix; 330 } 331 } 332 333 protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception { 334 Path outputDir = getTestDir(TEST_NAME, "load-output"); 335 LOG.info("Load output dir: " + outputDir); 336 337 NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT)); 338 conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString()); 339 340 Job job = Job.getInstance(conf); 341 job.setJobName(TEST_NAME + " Load for " + htd.getTableName()); 342 job.setJarByClass(this.getClass()); 343 setMapperClass(job); 344 job.setInputFormatClass(NMapInputFormat.class); 345 job.setNumReduceTasks(0); 346 setJobScannerConf(job); 347 FileOutputFormat.setOutputPath(job, outputDir); 348 349 TableMapReduceUtil.addDependencyJars(job); 350 351 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class); 352 TableMapReduceUtil.initCredentials(job); 353 assertTrue(job.waitForCompletion(true)); 354 return job; 355 } 356 357 protected void setMapperClass(Job job) { 358 job.setMapperClass(LoadMapper.class); 359 } 360 361 protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception { 362 Path outputDir = getTestDir(TEST_NAME, "verify-output"); 363 LOG.info("Verify output dir: " + outputDir); 364 365 Job job = Job.getInstance(conf); 366 job.setJarByClass(this.getClass()); 367 job.setJobName(TEST_NAME + " Verification for " + htd.getTableName()); 368 setJobScannerConf(job); 369 370 Scan scan = new Scan(); 371 372 TableMapReduceUtil.initTableMapperJob( 373 htd.getTableName().getNameAsString(), scan, VerifyMapper.class, 374 BytesWritable.class, BytesWritable.class, job); 375 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class); 376 int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING); 377 TableMapReduceUtil.setScannerCaching(job, scannerCaching); 378 379 job.setReducerClass(VerifyReducer.class); 380 job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT)); 381 FileOutputFormat.setOutputPath(job, outputDir); 382 assertTrue(job.waitForCompletion(true)); 383 384 long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue(); 385 assertEquals(0, numOutputRecords); 386 } 387 388 /** 389 * Tool to search missing rows in WALs and hfiles. 390 * Pass in file or dir of keys to search for. Key file must have been written by Verify step 391 * (we depend on the format it writes out. We'll read them in and then search in hbase 392 * WALs and oldWALs dirs (Some of this is TODO). 393 */ 394 public static class WALSearcher extends WALPlayer { 395 public WALSearcher(Configuration conf) { 396 super(conf); 397 } 398 399 /** 400 * The actual searcher mapper. 401 */ 402 public static class WALMapperSearcher extends WALMapper { 403 private SortedSet<byte []> keysToFind; 404 private AtomicInteger rows = new AtomicInteger(0); 405 406 @Override 407 public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) 408 throws IOException { 409 super.setup(context); 410 try { 411 this.keysToFind = readKeysToSearch(context.getConfiguration()); 412 LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); 413 } catch (InterruptedException e) { 414 throw new InterruptedIOException(e.toString()); 415 } 416 } 417 418 @Override 419 protected boolean filter(Context context, Cell cell) { 420 // TODO: Can I do a better compare than this copying out key? 421 byte [] row = new byte [cell.getRowLength()]; 422 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); 423 boolean b = this.keysToFind.contains(row); 424 if (b) { 425 String keyStr = Bytes.toStringBinary(row); 426 try { 427 LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); 428 } catch (IOException|InterruptedException e) { 429 LOG.warn(e.toString(), e); 430 } 431 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 432 context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); 433 } 434 context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); 435 } 436 return b; 437 } 438 } 439 440 // Put in place the above WALMapperSearcher. 441 @Override 442 public Job createSubmittableJob(String[] args) throws IOException { 443 Job job = super.createSubmittableJob(args); 444 // Call my class instead. 445 job.setJarByClass(WALMapperSearcher.class); 446 job.setMapperClass(WALMapperSearcher.class); 447 job.setOutputFormatClass(NullOutputFormat.class); 448 return job; 449 } 450 } 451 452 static final String FOUND_GROUP_KEY = "Found"; 453 static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; 454 455 static SortedSet<byte []> readKeysToSearch(final Configuration conf) 456 throws IOException, InterruptedException { 457 Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); 458 FileSystem fs = FileSystem.get(conf); 459 SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 460 if (!fs.exists(keysInputDir)) { 461 throw new FileNotFoundException(keysInputDir.toString()); 462 } 463 if (!fs.isDirectory(keysInputDir)) { 464 FileStatus keyFileStatus = fs.getFileStatus(keysInputDir); 465 readFileToSearch(conf, fs, keyFileStatus, result); 466 } else { 467 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false); 468 while(iterator.hasNext()) { 469 LocatedFileStatus keyFileStatus = iterator.next(); 470 // Skip "_SUCCESS" file. 471 if (keyFileStatus.getPath().getName().startsWith("_")) continue; 472 readFileToSearch(conf, fs, keyFileStatus, result); 473 } 474 } 475 return result; 476 } 477 478 private static SortedSet<byte[]> readFileToSearch(final Configuration conf, 479 final FileSystem fs, final FileStatus keyFileStatus, SortedSet<byte []> result) 480 throws IOException, 481 InterruptedException { 482 // verify uses file output format and writes <Text, Text>. We can read it as a text file 483 try (InputStream in = fs.open(keyFileStatus.getPath()); 484 BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { 485 // extract out the key and return that missing as a missing key 486 String line; 487 while ((line = reader.readLine()) != null) { 488 if (line.isEmpty()) continue; 489 490 String[] parts = line.split("\\s+"); 491 if (parts.length >= 1) { 492 String key = parts[0]; 493 result.add(Bytes.toBytesBinary(key)); 494 } else { 495 LOG.info("Cannot parse key from: " + line); 496 } 497 } 498 } 499 return result; 500 } 501 502 private int doSearch(Configuration conf, String keysDir) throws Exception { 503 Path inputDir = new Path(keysDir); 504 505 getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); 506 SortedSet<byte []> keys = readKeysToSearch(getConf()); 507 if (keys.isEmpty()) throw new RuntimeException("No keys to find"); 508 LOG.info("Count of keys to find: " + keys.size()); 509 for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); 510 // Now read all WALs. In two dirs. Presumes certain layout. 511 Path walsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME); 512 Path oldWalsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME); 513 LOG.info("Running Search with keys inputDir=" + inputDir + 514 " against " + getConf().get(HConstants.HBASE_DIR)); 515 int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""}); 516 if (ret != 0) return ret; 517 return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""}); 518 } 519 520 private static void setJobScannerConf(Job job) { 521 // Make sure scanners log something useful to make debugging possible. 522 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true); 523 long lpr = job.getConfiguration().getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT) / 100; 524 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, (int)lpr); 525 } 526 527 public Path getTestDir(String testName, String subdir) throws IOException { 528 Path testDir = util.getDataTestDirOnTestFS(testName); 529 FileSystem fs = FileSystem.get(getConf()); 530 fs.deleteOnExit(testDir); 531 532 return new Path(new Path(testDir, testName), subdir); 533 } 534 535 @Test 536 public void testLoadAndVerify() throws Exception { 537 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TEST_NAME)); 538 htd.addFamily(new HColumnDescriptor(TEST_FAMILY)); 539 540 Admin admin = getTestingUtil(getConf()).getAdmin(); 541 admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), 40); 542 543 doLoad(getConf(), htd); 544 doVerify(getConf(), htd); 545 546 // Only disable and drop if we succeeded to verify - otherwise it's useful 547 // to leave it around for post-mortem 548 getTestingUtil(getConf()).deleteTable(htd.getTableName()); 549 } 550 551 @Override 552 public void printUsage() { 553 printUsage(this.getClass().getSimpleName() + " <options>" 554 + " [-Doptions] <load|verify|loadAndVerify|search>", "Options", ""); 555 System.err.println(""); 556 System.err.println(" Loads a table with row dependencies and verifies the dependency chains"); 557 System.err.println("Options"); 558 System.err.println(" -Dloadmapper.table=<name> Table to write/verify (default autogen)"); 559 System.err.println(" -Dloadmapper.backrefs=<n> Number of backreferences per row (default 50)"); 560 System.err.println(" -Dloadmapper.num_to_write=<n> Number of rows per mapper (default 100,000 per mapper)"); 561 System.err.println(" -Dloadmapper.deleteAfter=<bool> Delete after a successful verify (default true)"); 562 System.err.println(" -Dloadmapper.numPresplits=<n> Number of presplit regions to start with (default 40)"); 563 System.err.println(" -Dloadmapper.map.tasks=<n> Number of map tasks for load (default 200)"); 564 System.err.println(" -Dverify.reduce.tasks=<n> Number of reduce tasks for verify (default 35)"); 565 System.err.println(" -Dverify.scannercaching=<n> Number hbase scanner caching rows to read (default 50)"); 566 } 567 568 569 @Override 570 protected void processOptions(CommandLine cmd) { 571 super.processOptions(cmd); 572 573 String[] args = cmd.getArgs(); 574 if (args == null || args.length < 1) { 575 printUsage(); 576 throw new RuntimeException("Incorrect Number of args."); 577 } 578 toRun = args[0]; 579 if (toRun.equalsIgnoreCase("search")) { 580 if (args.length > 1) { 581 keysDir = args[1]; 582 } 583 } 584 } 585 586 @Override 587 public int runTestFromCommandLine() throws Exception { 588 IntegrationTestingUtility.setUseDistributedCluster(getConf()); 589 boolean doLoad = false; 590 boolean doVerify = false; 591 boolean doSearch = false; 592 boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true); 593 int numPresplits = getConf().getInt("loadmapper.numPresplits", 40); 594 595 if (toRun.equalsIgnoreCase("load")) { 596 doLoad = true; 597 } else if (toRun.equalsIgnoreCase("verify")) { 598 doVerify= true; 599 } else if (toRun.equalsIgnoreCase("loadAndVerify")) { 600 doLoad=true; 601 doVerify= true; 602 } else if (toRun.equalsIgnoreCase("search")) { 603 doLoad=false; 604 doVerify= false; 605 doSearch = true; 606 if (keysDir == null) { 607 System.err.println("Usage: search <KEYS_DIR>]"); 608 return 1; 609 } 610 } else { 611 System.err.println("Invalid argument " + toRun); 612 printUsage(); 613 return 1; 614 } 615 616 // create HTableDescriptor for specified table 617 TableName table = getTablename(); 618 HTableDescriptor htd = new HTableDescriptor(table); 619 htd.addFamily(new HColumnDescriptor(TEST_FAMILY)); 620 621 if (doLoad) { 622 try (Connection conn = ConnectionFactory.createConnection(getConf()); 623 Admin admin = conn.getAdmin()) { 624 admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits); 625 doLoad(getConf(), htd); 626 } 627 } 628 if (doVerify) { 629 doVerify(getConf(), htd); 630 if (doDelete) { 631 getTestingUtil(getConf()).deleteTable(htd.getTableName()); 632 } 633 } 634 if (doSearch) { 635 return doSearch(getConf(), keysDir); 636 } 637 return 0; 638 } 639 640 @Override 641 public TableName getTablename() { 642 return TableName.valueOf(getConf().get(TABLE_NAME_KEY, TEST_NAME)); 643 } 644 645 @Override 646 protected Set<String> getColumnFamilies() { 647 return Sets.newHashSet(Bytes.toString(TEST_FAMILY)); 648 } 649 650 public static void main(String argv[]) throws Exception { 651 Configuration conf = HBaseConfiguration.create(); 652 IntegrationTestingUtility.setUseDistributedCluster(conf); 653 int ret = ToolRunner.run(conf, new IntegrationTestLoadAndVerify(), argv); 654 System.exit(ret); 655 } 656}