001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.security.SecureRandom; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Random; 031import java.util.Set; 032import java.util.SortedSet; 033import java.util.TreeSet; 034import java.util.UUID; 035import java.util.concurrent.ThreadLocalRandom; 036import java.util.concurrent.atomic.AtomicInteger; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.conf.Configured; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.LocatedFileStatus; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.fs.RemoteIterator; 043import org.apache.hadoop.hbase.Cell; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HBaseTestingUtility; 046import org.apache.hadoop.hbase.HColumnDescriptor; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.HRegionLocation; 049import org.apache.hadoop.hbase.HTableDescriptor; 050import org.apache.hadoop.hbase.IntegrationTestBase; 051import org.apache.hadoop.hbase.IntegrationTestingUtility; 052import org.apache.hadoop.hbase.MasterNotRunningException; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.Admin; 055import org.apache.hadoop.hbase.client.BufferedMutator; 056import org.apache.hadoop.hbase.client.BufferedMutatorParams; 057import org.apache.hadoop.hbase.client.Connection; 058import org.apache.hadoop.hbase.client.ConnectionConfiguration; 059import org.apache.hadoop.hbase.client.ConnectionFactory; 060import org.apache.hadoop.hbase.client.Get; 061import org.apache.hadoop.hbase.client.Mutation; 062import org.apache.hadoop.hbase.client.Put; 063import org.apache.hadoop.hbase.client.RegionLocator; 064import org.apache.hadoop.hbase.client.Result; 065import org.apache.hadoop.hbase.client.ResultScanner; 066import org.apache.hadoop.hbase.client.Scan; 067import org.apache.hadoop.hbase.client.ScannerCallable; 068import org.apache.hadoop.hbase.client.Table; 069import org.apache.hadoop.hbase.fs.HFileSystem; 070import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 071import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 072import org.apache.hadoop.hbase.mapreduce.TableMapper; 073import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; 074import org.apache.hadoop.hbase.mapreduce.WALPlayer; 075import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy; 076import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; 077import org.apache.hadoop.hbase.testclassification.IntegrationTests; 078import org.apache.hadoop.hbase.util.AbstractHBaseTool; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.apache.hadoop.hbase.util.CommonFSUtils; 081import org.apache.hadoop.hbase.util.Random64; 082import org.apache.hadoop.hbase.util.RegionSplitter; 083import org.apache.hadoop.hbase.wal.WALEdit; 084import org.apache.hadoop.hbase.wal.WALKey; 085import org.apache.hadoop.io.BytesWritable; 086import org.apache.hadoop.io.NullWritable; 087import org.apache.hadoop.io.Writable; 088import org.apache.hadoop.mapreduce.Counter; 089import org.apache.hadoop.mapreduce.CounterGroup; 090import org.apache.hadoop.mapreduce.Counters; 091import org.apache.hadoop.mapreduce.InputFormat; 092import org.apache.hadoop.mapreduce.InputSplit; 093import org.apache.hadoop.mapreduce.Job; 094import org.apache.hadoop.mapreduce.JobContext; 095import org.apache.hadoop.mapreduce.Mapper; 096import org.apache.hadoop.mapreduce.RecordReader; 097import org.apache.hadoop.mapreduce.Reducer; 098import org.apache.hadoop.mapreduce.TaskAttemptContext; 099import org.apache.hadoop.mapreduce.TaskAttemptID; 100import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 101import org.apache.hadoop.mapreduce.lib.input.FileSplit; 102import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat; 103import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 104import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 105import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 106import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; 107import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 108import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 109import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; 110import org.apache.hadoop.util.Tool; 111import org.apache.hadoop.util.ToolRunner; 112import org.junit.Test; 113import org.junit.experimental.categories.Category; 114import org.slf4j.Logger; 115import org.slf4j.LoggerFactory; 116 117import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 118import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 119import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 120import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 121import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 122import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 123import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 124 125/** 126 * This is an integration test borrowed from goraci, written by Keith Turner, 127 * which is in turn inspired by the Accumulo test called continous ingest (ci). 128 * The original source code can be found here: 129 * https://github.com/keith-turner/goraci 130 * https://github.com/enis/goraci/ 131 * 132 * Apache Accumulo [0] has a simple test suite that verifies that data is not 133 * lost at scale. This test suite is called continuous ingest. This test runs 134 * many ingest clients that continually create linked lists containing 25 135 * million nodes. At some point the clients are stopped and a map reduce job is 136 * run to ensure no linked list has a hole. A hole indicates data was lost.·· 137 * 138 * The nodes in the linked list are random. This causes each linked list to 139 * spread across the table. Therefore if one part of a table loses data, then it 140 * will be detected by references in another part of the table. 141 * 142 * THE ANATOMY OF THE TEST 143 * 144 * Below is rough sketch of how data is written. For specific details look at 145 * the Generator code. 146 * 147 * 1 Write out 1 million nodes· 2 Flush the client· 3 Write out 1 million that 148 * reference previous million· 4 If this is the 25th set of 1 million nodes, 149 * then update 1st set of million to point to last· 5 goto 1 150 * 151 * The key is that nodes only reference flushed nodes. Therefore a node should 152 * never reference a missing node, even if the ingest client is killed at any 153 * point in time. 154 * 155 * When running this test suite w/ Accumulo there is a script running in 156 * parallel called the Aggitator that randomly and continuously kills server 157 * processes.·· The outcome was that many data loss bugs were found in Accumulo 158 * by doing this.· This test suite can also help find bugs that impact uptime 159 * and stability when· run for days or weeks.·· 160 * 161 * This test suite consists the following· - a few Java programs· - a little 162 * helper script to run the java programs - a maven script to build it.·· 163 * 164 * When generating data, its best to have each map task generate a multiple of 165 * 25 million. The reason for this is that circular linked list are generated 166 * every 25M. Not generating a multiple in 25M will result in some nodes in the 167 * linked list not having references. The loss of an unreferenced node can not 168 * be detected. 169 * 170 * 171 * Below is a description of the Java programs 172 * 173 * Generator - A map only job that generates data. As stated previously,·its best to generate data 174 * in multiples of 25M. An option is also available to allow concurrent walkers to select and walk 175 * random flushed loops during this phase. 176 * 177 * Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and 178 * UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same 179 * time as the Generator. 180 * 181 * Walker - A standalone program that start following a linked list· and emits timing info.·· 182 * 183 * Print - A standalone program that prints nodes in the linked list 184 * 185 * Delete - A standalone program that deletes a single node 186 * 187 * This class can be run as a unit test, as an integration test, or from the command line 188 * 189 * ex: 190 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 191 * loop 2 1 100000 /temp 1 1000 50 1 0 192 * 193 */ 194@Category(IntegrationTests.class) 195public class IntegrationTestBigLinkedList extends IntegrationTestBase { 196 protected static final byte[] NO_KEY = new byte[1]; 197 198 protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table"; 199 200 protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList"; 201 202 protected static byte[] FAMILY_NAME = Bytes.toBytes("meta"); 203 private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big"); 204 private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny"); 205 206 //link to the id of the prev node in the linked list 207 protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev"); 208 209 //identifier of the mapred task that generated this row 210 protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client"); 211 212 //the id of the row within the same client. 213 protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count"); 214 215 /** How many rows to write per map task. This has to be a multiple of 25M */ 216 private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY 217 = "IntegrationTestBigLinkedList.generator.num_rows"; 218 219 private static final String GENERATOR_NUM_MAPPERS_KEY 220 = "IntegrationTestBigLinkedList.generator.map.tasks"; 221 222 private static final String GENERATOR_WIDTH_KEY 223 = "IntegrationTestBigLinkedList.generator.width"; 224 225 private static final String GENERATOR_WRAP_KEY 226 = "IntegrationTestBigLinkedList.generator.wrap"; 227 228 private static final String CONCURRENT_WALKER_KEY 229 = "IntegrationTestBigLinkedList.generator.concurrentwalkers"; 230 231 protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster 232 233 private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters 234 235 private static final int WIDTH_DEFAULT = 1000000; 236 private static final int WRAP_DEFAULT = 25; 237 private static final int ROWKEY_LENGTH = 16; 238 239 private static final int CONCURRENT_WALKER_DEFAULT = 0; 240 241 protected String toRun; 242 protected String[] otherArgs; 243 244 static class CINode { 245 byte[] key; 246 byte[] prev; 247 String client; 248 long count; 249 } 250 251 /** 252 * A Map only job that generates random linked list and stores them. 253 */ 254 static class Generator extends Configured implements Tool { 255 256 private static final Logger LOG = LoggerFactory.getLogger(Generator.class); 257 258 /** 259 * Set this configuration if you want to test single-column family flush works. If set, we will 260 * add a big column family and a small column family on either side of the usual ITBLL 'meta' 261 * column family. When we write out the ITBLL, we will also add to the big column family a value 262 * bigger than that for ITBLL and for small, something way smaller. The idea is that when 263 * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any 264 * way. Here is how you would pass it: 265 * <p> 266 * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 267 * -Dgenerator.multiple.columnfamilies=true generator 1 10 g 268 */ 269 public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY = 270 "generator.multiple.columnfamilies"; 271 272 /** 273 * Set this configuration if you want to scale up the size of test data quickly. 274 * <p> 275 * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 276 * -Dgenerator.big.family.value.size=1024 generator 1 10 output 277 */ 278 public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size"; 279 280 281 public static enum Counts { 282 SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION 283 } 284 285 public static final String USAGE = "Usage : " + Generator.class.getSimpleName() + 286 " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" + 287 " <num walker threads>] \n" + 288 "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" + 289 "walkers will verify random flushed loop during Generation."; 290 291 public Job job; 292 293 static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> { 294 static class GeneratorInputSplit extends InputSplit implements Writable { 295 @Override 296 public long getLength() throws IOException, InterruptedException { 297 return 1; 298 } 299 @Override 300 public String[] getLocations() throws IOException, InterruptedException { 301 return new String[0]; 302 } 303 @Override 304 public void readFields(DataInput arg0) throws IOException { 305 } 306 @Override 307 public void write(DataOutput arg0) throws IOException { 308 } 309 } 310 311 static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> { 312 private long count; 313 private long numNodes; 314 private Random64 rand; 315 316 @Override 317 public void close() throws IOException { 318 } 319 320 @Override 321 public BytesWritable getCurrentKey() throws IOException, InterruptedException { 322 byte[] bytes = new byte[ROWKEY_LENGTH]; 323 rand.nextBytes(bytes); 324 return new BytesWritable(bytes); 325 } 326 327 @Override 328 public NullWritable getCurrentValue() throws IOException, InterruptedException { 329 return NullWritable.get(); 330 } 331 332 @Override 333 public float getProgress() throws IOException, InterruptedException { 334 return (float)(count / (double)numNodes); 335 } 336 337 @Override 338 public void initialize(InputSplit arg0, TaskAttemptContext context) 339 throws IOException, InterruptedException { 340 numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000); 341 // Use Random64 to avoid issue described in HBASE-21256. 342 rand = new Random64(); 343 } 344 345 @Override 346 public boolean nextKeyValue() throws IOException, InterruptedException { 347 return count++ < numNodes; 348 } 349 350 } 351 352 @Override 353 public RecordReader<BytesWritable,NullWritable> createRecordReader( 354 InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 355 GeneratorRecordReader rr = new GeneratorRecordReader(); 356 rr.initialize(split, context); 357 return rr; 358 } 359 360 @Override 361 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 362 int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1); 363 364 ArrayList<InputSplit> splits = new ArrayList<>(numMappers); 365 366 for (int i = 0; i < numMappers; i++) { 367 splits.add(new GeneratorInputSplit()); 368 } 369 370 return splits; 371 } 372 } 373 374 /** Ensure output files from prev-job go to map inputs for current job */ 375 static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> { 376 @Override 377 protected boolean isSplitable(JobContext context, Path filename) { 378 return false; 379 } 380 } 381 382 /** 383 * Some ASCII art time: 384 * <p> 385 * [ . . . ] represents one batch of random longs of length WIDTH 386 * <pre> 387 * _________________________ 388 * | ______ | 389 * | | || 390 * .-+-----------------+-----.|| 391 * | | | ||| 392 * first = [ . . . . . . . . . . . ] ||| 393 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 394 * | | | | | | | | | | | ||| 395 * prev = [ . . . . . . . . . . . ] ||| 396 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 397 * | | | | | | | | | | | ||| 398 * current = [ . . . . . . . . . . . ] ||| 399 * ||| 400 * ... ||| 401 * ||| 402 * last = [ . . . . . . . . . . . ] ||| 403 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____||| 404 * | |________|| 405 * |___________________________| 406 * </pre> 407 */ 408 409 static class GeneratorMapper 410 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 411 412 byte[][] first = null; 413 byte[][] prev = null; 414 byte[][] current = null; 415 byte[] id; 416 long count = 0; 417 int i; 418 BufferedMutator mutator; 419 Connection connection; 420 long numNodes; 421 long wrap; 422 int width; 423 boolean multipleUnevenColumnFamilies; 424 byte[] tinyValue = new byte[] { 't' }; 425 byte[] bigValue = null; 426 Configuration conf; 427 428 volatile boolean walkersStop; 429 int numWalkers; 430 volatile List<Long> flushedLoops = new ArrayList<>(); 431 List<Thread> walkers = new ArrayList<>(); 432 433 @Override 434 protected void setup(Context context) throws IOException, InterruptedException { 435 id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID()); 436 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 437 instantiateHTable(); 438 this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT); 439 current = new byte[this.width][]; 440 int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT); 441 this.wrap = (long)wrapMultiplier * width; 442 this.numNodes = context.getConfiguration().getLong( 443 GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT); 444 if (this.numNodes < this.wrap) { 445 this.wrap = this.numNodes; 446 } 447 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 448 this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT); 449 this.walkersStop = false; 450 this.conf = context.getConfiguration(); 451 452 if (multipleUnevenColumnFamilies) { 453 int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256); 454 int limit = context.getConfiguration().getInt( 455 ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 456 ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT); 457 458 Preconditions.checkArgument( 459 n <= limit, 460 "%s(%s) > %s(%s)", 461 BIG_FAMILY_VALUE_SIZE_KEY, n, ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit); 462 463 bigValue = new byte[n]; 464 ThreadLocalRandom.current().nextBytes(bigValue); 465 LOG.info("Create a bigValue with " + n + " bytes."); 466 } 467 468 Preconditions.checkArgument( 469 numNodes > 0, 470 "numNodes(%s) <= 0", 471 numNodes); 472 Preconditions.checkArgument( 473 numNodes % width == 0, 474 "numNodes(%s) mod width(%s) != 0", 475 numNodes, width); 476 Preconditions.checkArgument( 477 numNodes % wrap == 0, 478 "numNodes(%s) mod wrap(%s) != 0", 479 numNodes, wrap 480 ); 481 } 482 483 protected void instantiateHTable() throws IOException { 484 mutator = connection.getBufferedMutator( 485 new BufferedMutatorParams(getTableName(connection.getConfiguration())) 486 .writeBufferSize(4 * 1024 * 1024)); 487 } 488 489 @Override 490 protected void cleanup(Context context) throws IOException ,InterruptedException { 491 joinWalkers(); 492 mutator.close(); 493 connection.close(); 494 } 495 496 @Override 497 protected void map(BytesWritable key, NullWritable value, Context output) throws IOException { 498 current[i] = new byte[key.getLength()]; 499 System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength()); 500 if (++i == current.length) { 501 LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=", 502 current.length, count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i); 503 persist(output, count, prev, current, id); 504 i = 0; 505 506 if (first == null) { 507 first = current; 508 } 509 prev = current; 510 current = new byte[this.width][]; 511 512 count += current.length; 513 output.setStatus("Count " + count); 514 515 if (count % wrap == 0) { 516 // this block of code turns the 1 million linked list of length 25 into one giant 517 //circular linked list of 25 million 518 circularLeftShift(first); 519 persist(output, -1, prev, first, null); 520 // At this point the entire loop has been flushed so we can add one of its nodes to the 521 // concurrent walker 522 if (numWalkers > 0) { 523 addFlushed(key.getBytes()); 524 if (walkers.isEmpty()) { 525 startWalkers(numWalkers, conf, output); 526 } 527 } 528 first = null; 529 prev = null; 530 } 531 } 532 } 533 534 private static <T> void circularLeftShift(T[] first) { 535 T ez = first[0]; 536 System.arraycopy(first, 1, first, 0, first.length - 1); 537 first[first.length - 1] = ez; 538 } 539 540 private void addFlushed(byte[] rowKey) { 541 synchronized (flushedLoops) { 542 flushedLoops.add(Bytes.toLong(rowKey)); 543 flushedLoops.notifyAll(); 544 } 545 } 546 547 protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id) 548 throws IOException { 549 for (int i = 0; i < current.length; i++) { 550 551 if (i % 100 == 0) { 552 // Tickle progress every so often else maprunner will think us hung 553 output.progress(); 554 } 555 556 Put put = new Put(current[i]); 557 put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]); 558 559 if (count >= 0) { 560 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); 561 } 562 if (id != null) { 563 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); 564 } 565 // See if we are to write multiple columns. 566 if (this.multipleUnevenColumnFamilies) { 567 // Use any column name. 568 put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue); 569 // Use any column name. 570 put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue); 571 } 572 mutator.mutate(put); 573 } 574 575 mutator.flush(); 576 } 577 578 private void startWalkers(int numWalkers, Configuration conf, Context context) { 579 LOG.info("Starting " + numWalkers + " concurrent walkers"); 580 for (int i = 0; i < numWalkers; i++) { 581 Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context)); 582 walker.start(); 583 walkers.add(walker); 584 } 585 } 586 587 private void joinWalkers() { 588 walkersStop = true; 589 synchronized (flushedLoops) { 590 flushedLoops.notifyAll(); 591 } 592 for (Thread walker : walkers) { 593 try { 594 walker.join(); 595 } catch (InterruptedException e) { 596 // no-op 597 } 598 } 599 } 600 601 /** 602 * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by 603 * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are 604 * configured to only log erroneous nodes. 605 */ 606 607 public class ContinuousConcurrentWalker implements Runnable { 608 609 ConcurrentWalker walker; 610 Configuration conf; 611 Context context; 612 Random rand; 613 614 public ContinuousConcurrentWalker(Configuration conf, Context context) { 615 this.conf = conf; 616 this.context = context; 617 rand = new Random(); 618 } 619 620 @Override 621 public void run() { 622 while (!walkersStop) { 623 try { 624 long node = selectLoop(); 625 try { 626 walkLoop(node); 627 } catch (IOException e) { 628 context.getCounter(Counts.IOEXCEPTION).increment(1l); 629 return; 630 } 631 } catch (InterruptedException e) { 632 return; 633 } 634 } 635 } 636 637 private void walkLoop(long node) throws IOException { 638 walker = new ConcurrentWalker(context); 639 walker.setConf(conf); 640 walker.run(node, wrap); 641 } 642 643 private long selectLoop () throws InterruptedException{ 644 synchronized (flushedLoops) { 645 while (flushedLoops.isEmpty() && !walkersStop) { 646 flushedLoops.wait(); 647 } 648 if (walkersStop) { 649 throw new InterruptedException(); 650 } 651 return flushedLoops.get(rand.nextInt(flushedLoops.size())); 652 } 653 } 654 } 655 656 public static class ConcurrentWalker extends WalkerBase { 657 658 Context context; 659 660 public ConcurrentWalker(Context context) {this.context = context;} 661 662 public void run(long startKeyIn, long maxQueriesIn) throws IOException { 663 664 long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE; 665 byte[] startKey = Bytes.toBytes(startKeyIn); 666 667 Connection connection = ConnectionFactory.createConnection(getConf()); 668 Table table = connection.getTable(getTableName(getConf())); 669 long numQueries = 0; 670 // If isSpecificStart is set, only walk one list from that particular node. 671 // Note that in case of circular (or P-shaped) list it will walk forever, as is 672 // the case in normal run without startKey. 673 674 CINode node = findStartNode(table, startKey); 675 if (node == null) { 676 LOG.error("Start node not found: " + Bytes.toStringBinary(startKey)); 677 throw new IOException("Start node not found: " + startKeyIn); 678 } 679 while (numQueries < maxQueries) { 680 numQueries++; 681 byte[] prev = node.prev; 682 long t1 = System.currentTimeMillis(); 683 node = getNode(prev, table, node); 684 long t2 = System.currentTimeMillis(); 685 if (node == null) { 686 LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev)); 687 context.getCounter(Counts.UNDEFINED).increment(1l); 688 } else if (node.prev.length == NO_KEY.length) { 689 LOG.error("ConcurrentWalker found TERMINATING NODE: " + 690 Bytes.toStringBinary(node.key)); 691 context.getCounter(Counts.TERMINATING).increment(1l); 692 } else { 693 // Increment for successful walk 694 context.getCounter(Counts.SUCCESS).increment(1l); 695 } 696 } 697 table.close(); 698 connection.close(); 699 } 700 } 701 } 702 703 @Override 704 public int run(String[] args) throws Exception { 705 if (args.length < 3) { 706 System.err.println(USAGE); 707 return 1; 708 } 709 try { 710 int numMappers = Integer.parseInt(args[0]); 711 long numNodes = Long.parseLong(args[1]); 712 Path tmpOutput = new Path(args[2]); 713 Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]); 714 Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]); 715 Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]); 716 return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 717 } catch (NumberFormatException e) { 718 System.err.println("Parsing generator arguments failed: " + e.getMessage()); 719 System.err.println(USAGE); 720 return 1; 721 } 722 } 723 724 protected void createSchema() throws IOException { 725 Configuration conf = getConf(); 726 TableName tableName = getTableName(conf); 727 try (Connection conn = ConnectionFactory.createConnection(conf); 728 Admin admin = conn.getAdmin()) { 729 if (!admin.tableExists(tableName)) { 730 HTableDescriptor htd = new HTableDescriptor(getTableName(getConf())); 731 htd.addFamily(new HColumnDescriptor(FAMILY_NAME)); 732 // Always add these families. Just skip writing to them when we do not test per CF flush. 733 htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME)); 734 htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME)); 735 // if -DuseMob=true force all data through mob path. 736 if (conf.getBoolean("useMob", false)) { 737 for (HColumnDescriptor hcd : htd.getColumnFamilies() ) { 738 hcd.setMobEnabled(true); 739 hcd.setMobThreshold(4); 740 } 741 } 742 743 // If we want to pre-split compute how many splits. 744 if (conf.getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY, 745 HBaseTestingUtility.PRESPLIT_TEST_TABLE)) { 746 int numberOfServers = admin.getRegionServers().size(); 747 if (numberOfServers == 0) { 748 throw new IllegalStateException("No live regionservers"); 749 } 750 int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY, 751 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER); 752 int totalNumberOfRegions = numberOfServers * regionsPerServer; 753 LOG.info("Number of live regionservers: " + numberOfServers + ", " + 754 "pre-splitting table into " + totalNumberOfRegions + " regions " + 755 "(default regions per server: " + regionsPerServer + ")"); 756 757 758 byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions); 759 760 admin.createTable(htd, splits); 761 } else { 762 // Looks like we're just letting things play out. 763 // Create a table with on region by default. 764 // This will make the splitting work hard. 765 admin.createTable(htd); 766 } 767 } 768 } catch (MasterNotRunningException e) { 769 LOG.error("Master not running", e); 770 throw new IOException(e); 771 } 772 } 773 774 public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, 775 Integer width, Integer wrapMultiplier, Integer numWalkers) 776 throws Exception { 777 LOG.info("Running RandomInputGenerator with numMappers=" + numMappers 778 + ", numNodes=" + numNodes); 779 Job job = Job.getInstance(getConf()); 780 781 job.setJobName("Random Input Generator"); 782 job.setNumReduceTasks(0); 783 job.setJarByClass(getClass()); 784 785 job.setInputFormatClass(GeneratorInputFormat.class); 786 job.setOutputKeyClass(BytesWritable.class); 787 job.setOutputValueClass(NullWritable.class); 788 789 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 790 791 job.setMapperClass(Mapper.class); //identity mapper 792 793 FileOutputFormat.setOutputPath(job, tmpOutput); 794 job.setOutputFormatClass(SequenceFileOutputFormat.class); 795 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class); 796 797 boolean success = jobCompletion(job); 798 799 return success ? 0 : 1; 800 } 801 802 public int runGenerator(int numMappers, long numNodes, Path tmpOutput, 803 Integer width, Integer wrapMultiplier, Integer numWalkers) 804 throws Exception { 805 LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes); 806 createSchema(); 807 job = Job.getInstance(getConf()); 808 809 job.setJobName("Link Generator"); 810 job.setNumReduceTasks(0); 811 job.setJarByClass(getClass()); 812 813 FileInputFormat.setInputPaths(job, tmpOutput); 814 job.setInputFormatClass(OneFilePerMapperSFIF.class); 815 job.setOutputKeyClass(NullWritable.class); 816 job.setOutputValueClass(NullWritable.class); 817 818 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 819 820 setMapperForGenerator(job); 821 822 job.setOutputFormatClass(NullOutputFormat.class); 823 824 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 825 TableMapReduceUtil.addDependencyJars(job); 826 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 827 AbstractHBaseTool.class); 828 TableMapReduceUtil.initCredentials(job); 829 830 boolean success = jobCompletion(job); 831 832 return success ? 0 : 1; 833 } 834 835 protected boolean jobCompletion(Job job) throws IOException, InterruptedException, 836 ClassNotFoundException { 837 boolean success = job.waitForCompletion(true); 838 return success; 839 } 840 841 protected void setMapperForGenerator(Job job) { 842 job.setMapperClass(GeneratorMapper.class); 843 } 844 845 public int run(int numMappers, long numNodes, Path tmpOutput, 846 Integer width, Integer wrapMultiplier, Integer numWalkers) 847 throws Exception { 848 int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, 849 numWalkers); 850 if (ret > 0) { 851 return ret; 852 } 853 return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 854 } 855 856 public boolean verify() { 857 try { 858 Counters counters = job.getCounters(); 859 if (counters == null) { 860 LOG.info("Counters object was null, Generator verification cannot be performed." 861 + " This is commonly a result of insufficient YARN configuration."); 862 return false; 863 } 864 865 if (counters.findCounter(Counts.TERMINATING).getValue() > 0 || 866 counters.findCounter(Counts.UNDEFINED).getValue() > 0 || 867 counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) { 868 LOG.error("Concurrent walker failed to verify during Generation phase"); 869 LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue()); 870 LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue()); 871 LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue()); 872 return false; 873 } 874 } catch (IOException e) { 875 LOG.info("Generator verification could not find counter"); 876 return false; 877 } 878 return true; 879 } 880 } 881 882 /** 883 * Tool to search missing rows in WALs and hfiles. 884 * Pass in file or dir of keys to search for. Key file must have been written by Verify step 885 * (we depend on the format it writes out. We'll read them in and then search in hbase 886 * WALs and oldWALs dirs (Some of this is TODO). 887 */ 888 static class Search extends Configured implements Tool { 889 private static final Logger LOG = LoggerFactory.getLogger(Search.class); 890 protected Job job; 891 892 private static void printUsage(final String error) { 893 if (error != null && error.length() > 0) System.out.println("ERROR: " + error); 894 System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]"); 895 } 896 897 @Override 898 public int run(String[] args) throws Exception { 899 if (args.length < 1 || args.length > 2) { 900 printUsage(null); 901 return 1; 902 } 903 Path inputDir = new Path(args[0]); 904 int numMappers = 1; 905 if (args.length > 1) { 906 numMappers = Integer.parseInt(args[1]); 907 } 908 return run(inputDir, numMappers); 909 } 910 911 /** 912 * WALPlayer override that searches for keys loaded in the setup. 913 */ 914 public static class WALSearcher extends WALPlayer { 915 public WALSearcher(Configuration conf) { 916 super(conf); 917 } 918 919 /** 920 * The actual searcher mapper. 921 */ 922 public static class WALMapperSearcher extends WALMapper { 923 private SortedSet<byte []> keysToFind; 924 private AtomicInteger rows = new AtomicInteger(0); 925 926 @Override 927 public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) 928 throws IOException { 929 super.setup(context); 930 try { 931 this.keysToFind = readKeysToSearch(context.getConfiguration()); 932 LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); 933 } catch (InterruptedException e) { 934 throw new InterruptedIOException(e.toString()); 935 } 936 } 937 938 @Override 939 protected boolean filter(Context context, Cell cell) { 940 // TODO: Can I do a better compare than this copying out key? 941 byte [] row = new byte [cell.getRowLength()]; 942 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); 943 boolean b = this.keysToFind.contains(row); 944 if (b) { 945 String keyStr = Bytes.toStringBinary(row); 946 try { 947 LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); 948 } catch (IOException|InterruptedException e) { 949 LOG.warn(e.toString(), e); 950 } 951 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 952 context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); 953 } 954 context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); 955 } 956 return b; 957 } 958 } 959 960 // Put in place the above WALMapperSearcher. 961 @Override 962 public Job createSubmittableJob(String[] args) throws IOException { 963 Job job = super.createSubmittableJob(args); 964 // Call my class instead. 965 job.setJarByClass(WALMapperSearcher.class); 966 job.setMapperClass(WALMapperSearcher.class); 967 job.setOutputFormatClass(NullOutputFormat.class); 968 return job; 969 } 970 } 971 972 static final String FOUND_GROUP_KEY = "Found"; 973 static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; 974 975 public int run(Path inputDir, int numMappers) throws Exception { 976 getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); 977 SortedSet<byte []> keys = readKeysToSearch(getConf()); 978 if (keys.isEmpty()) throw new RuntimeException("No keys to find"); 979 LOG.info("Count of keys to find: " + keys.size()); 980 for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); 981 // Now read all WALs. In two dirs. Presumes certain layout. 982 Path walsDir = new Path( 983 CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME); 984 Path oldWalsDir = new Path( 985 CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME); 986 LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers + 987 " against " + getConf().get(HConstants.HBASE_DIR)); 988 int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()), 989 new String [] {walsDir.toString(), ""}); 990 if (ret != 0) { 991 return ret; 992 } 993 return ToolRunner.run(getConf(), new WALSearcher(getConf()), 994 new String [] {oldWalsDir.toString(), ""}); 995 } 996 997 static SortedSet<byte []> readKeysToSearch(final Configuration conf) 998 throws IOException, InterruptedException { 999 Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); 1000 FileSystem fs = FileSystem.get(conf); 1001 SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1002 if (!fs.exists(keysInputDir)) { 1003 throw new FileNotFoundException(keysInputDir.toString()); 1004 } 1005 if (!fs.isDirectory(keysInputDir)) { 1006 throw new UnsupportedOperationException("TODO"); 1007 } else { 1008 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false); 1009 while(iterator.hasNext()) { 1010 LocatedFileStatus keyFileStatus = iterator.next(); 1011 // Skip "_SUCCESS" file. 1012 if (keyFileStatus.getPath().getName().startsWith("_")) continue; 1013 result.addAll(readFileToSearch(conf, fs, keyFileStatus)); 1014 } 1015 } 1016 return result; 1017 } 1018 1019 private static SortedSet<byte[]> readFileToSearch(final Configuration conf, 1020 final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException, 1021 InterruptedException { 1022 SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1023 // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is 1024 // what is missing. 1025 TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); 1026 try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr = 1027 new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) { 1028 InputSplit is = 1029 new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {}); 1030 rr.initialize(is, context); 1031 while (rr.nextKeyValue()) { 1032 rr.getCurrentKey(); 1033 BytesWritable bw = rr.getCurrentValue(); 1034 if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) { 1035 byte[] key = new byte[rr.getCurrentKey().getLength()]; 1036 System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey() 1037 .getLength()); 1038 result.add(key); 1039 } 1040 } 1041 } 1042 return result; 1043 } 1044 } 1045 1046 /** 1047 * A Map Reduce job that verifies that the linked lists generated by 1048 * {@link Generator} do not have any holes. 1049 */ 1050 static class Verify extends Configured implements Tool { 1051 1052 private static final Logger LOG = LoggerFactory.getLogger(Verify.class); 1053 protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 }); 1054 protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 }); 1055 1056 protected Job job; 1057 1058 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> { 1059 private BytesWritable row = new BytesWritable(); 1060 private BytesWritable ref = new BytesWritable(); 1061 1062 private boolean multipleUnevenColumnFamilies; 1063 1064 @Override 1065 protected void setup( 1066 Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context) 1067 throws IOException, InterruptedException { 1068 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 1069 } 1070 1071 @Override 1072 protected void map(ImmutableBytesWritable key, Result value, Context context) 1073 throws IOException ,InterruptedException { 1074 byte[] rowKey = key.get(); 1075 row.set(rowKey, 0, rowKey.length); 1076 if (multipleUnevenColumnFamilies 1077 && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn( 1078 TINY_FAMILY_NAME, TINY_FAMILY_NAME))) { 1079 context.write(row, DEF_LOST_FAMILIES); 1080 } else { 1081 context.write(row, DEF); 1082 } 1083 byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV); 1084 if (prev != null && prev.length > 0) { 1085 ref.set(prev, 0, prev.length); 1086 context.write(ref, row); 1087 } else { 1088 LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey))); 1089 } 1090 } 1091 } 1092 1093 /** 1094 * Don't change the order of these enums. Their ordinals are used as type flag when we emit 1095 * problems found from the reducer. 1096 */ 1097 public static enum Counts { 1098 UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES, 1099 LOST_FAMILIES 1100 } 1101 1102 /** 1103 * Per reducer, we output problem rows as byte arrasy so can be used as input for 1104 * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag 1105 * saying what sort of emission it is. Flag is the Count enum ordinal as a short. 1106 */ 1107 public static class VerifyReducer extends 1108 Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> { 1109 private ArrayList<byte[]> refs = new ArrayList<>(); 1110 private final BytesWritable UNREF = new BytesWritable(addPrefixFlag( 1111 Counts.UNREFERENCED.ordinal(), new byte[] {})); 1112 private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag( 1113 Counts.LOST_FAMILIES.ordinal(), new byte[] {})); 1114 1115 private AtomicInteger rows = new AtomicInteger(0); 1116 private Connection connection; 1117 1118 @Override 1119 protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1120 throws IOException, InterruptedException { 1121 super.setup(context); 1122 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 1123 } 1124 1125 @Override 1126 protected void cleanup( 1127 Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1128 throws IOException, InterruptedException { 1129 if (this.connection != null) { 1130 this.connection.close(); 1131 } 1132 super.cleanup(context); 1133 } 1134 1135 /** 1136 * @param ordinal 1137 * @param r 1138 * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up 1139 * Bytes.SIZEOF_SHORT bytes followed by <code>r</code> 1140 */ 1141 public static byte[] addPrefixFlag(final int ordinal, final byte [] r) { 1142 byte[] prefix = Bytes.toBytes((short)ordinal); 1143 if (prefix.length != Bytes.SIZEOF_SHORT) { 1144 throw new RuntimeException("Unexpected size: " + prefix.length); 1145 } 1146 byte[] result = new byte[prefix.length + r.length]; 1147 System.arraycopy(prefix, 0, result, 0, prefix.length); 1148 System.arraycopy(r, 0, result, prefix.length, r.length); 1149 return result; 1150 } 1151 1152 /** 1153 * @param bs 1154 * @return Type from the Counts enum of this row. Reads prefix added by 1155 * {@link #addPrefixFlag(int, byte[])} 1156 */ 1157 public static Counts whichType(final byte [] bs) { 1158 int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT); 1159 return Counts.values()[ordinal]; 1160 } 1161 1162 /** 1163 * @param bw 1164 * @return Row bytes minus the type flag. 1165 */ 1166 public static byte[] getRowOnly(BytesWritable bw) { 1167 byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT]; 1168 System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length); 1169 return bytes; 1170 } 1171 1172 @Override 1173 public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context) 1174 throws IOException, InterruptedException { 1175 int defCount = 0; 1176 boolean lostFamilies = false; 1177 refs.clear(); 1178 for (BytesWritable type : values) { 1179 if (type.getLength() == DEF.getLength()) { 1180 defCount++; 1181 if (type.getBytes()[0] == 1) { 1182 lostFamilies = true; 1183 } 1184 } else { 1185 byte[] bytes = new byte[type.getLength()]; 1186 System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength()); 1187 refs.add(bytes); 1188 } 1189 } 1190 1191 // TODO check for more than one def, should not happen 1192 StringBuilder refsSb = null; 1193 if (defCount == 0 || refs.size() != 1) { 1194 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1195 refsSb = dumpExtraInfoOnRefs(key, context, refs); 1196 LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" + 1197 (refsSb != null? refsSb.toString(): "")); 1198 } 1199 if (lostFamilies) { 1200 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1201 LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families"); 1202 context.getCounter(Counts.LOST_FAMILIES).increment(1); 1203 context.write(key, LOSTFAM); 1204 } 1205 1206 if (defCount == 0 && refs.size() > 0) { 1207 // This is bad, found a node that is referenced but not defined. It must have been 1208 // lost, emit some info about this node for debugging purposes. 1209 // Write out a line per reference. If more than one, flag it.; 1210 for (int i = 0; i < refs.size(); i++) { 1211 byte[] bs = refs.get(i); 1212 int ordinal; 1213 if (i <= 0) { 1214 ordinal = Counts.UNDEFINED.ordinal(); 1215 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1216 context.getCounter(Counts.UNDEFINED).increment(1); 1217 } else { 1218 ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal(); 1219 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1220 } 1221 } 1222 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1223 // Print out missing row; doing get on reference gives info on when the referencer 1224 // was added which can help a little debugging. This info is only available in mapper 1225 // output -- the 'Linked List error Key...' log message above. What we emit here is 1226 // useless for debugging. 1227 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1228 context.getCounter("undef", keyString).increment(1); 1229 } 1230 } else if (defCount > 0 && refs.isEmpty()) { 1231 // node is defined but not referenced 1232 context.write(key, UNREF); 1233 context.getCounter(Counts.UNREFERENCED).increment(1); 1234 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1235 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1236 context.getCounter("unref", keyString).increment(1); 1237 } 1238 } else { 1239 if (refs.size() > 1) { 1240 // Skip first reference. 1241 for (int i = 1; i < refs.size(); i++) { 1242 context.write(key, 1243 new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i)))); 1244 } 1245 context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1); 1246 } 1247 // node is defined and referenced 1248 context.getCounter(Counts.REFERENCED).increment(1); 1249 } 1250 } 1251 1252 /** 1253 * Dump out extra info around references if there are any. Helps debugging. 1254 * @return StringBuilder filled with references if any. 1255 * @throws IOException 1256 */ 1257 private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context, 1258 final List<byte []> refs) 1259 throws IOException { 1260 StringBuilder refsSb = null; 1261 if (refs.isEmpty()) return refsSb; 1262 refsSb = new StringBuilder(); 1263 String comma = ""; 1264 // If a row is a reference but has no define, print the content of the row that has 1265 // this row as a 'prev'; it will help debug. The missing row was written just before 1266 // the row we are dumping out here. 1267 TableName tn = getTableName(context.getConfiguration()); 1268 try (Table t = this.connection.getTable(tn)) { 1269 for (byte [] ref : refs) { 1270 Result r = t.get(new Get(ref)); 1271 List<Cell> cells = r.listCells(); 1272 String ts = (cells != null && !cells.isEmpty())? 1273 new java.util.Date(cells.get(0).getTimestamp()).toString(): ""; 1274 byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT); 1275 String jobStr = (b != null && b.length > 0)? Bytes.toString(b): ""; 1276 b = r.getValue(FAMILY_NAME, COLUMN_COUNT); 1277 long count = (b != null && b.length > 0)? Bytes.toLong(b): -1; 1278 b = r.getValue(FAMILY_NAME, COLUMN_PREV); 1279 String refRegionLocation = ""; 1280 String keyRegionLocation = ""; 1281 if (b != null && b.length > 0) { 1282 try (RegionLocator rl = this.connection.getRegionLocator(tn)) { 1283 HRegionLocation hrl = rl.getRegionLocation(b); 1284 if (hrl != null) refRegionLocation = hrl.toString(); 1285 // Key here probably has trailing zeros on it. 1286 hrl = rl.getRegionLocation(key.getBytes()); 1287 if (hrl != null) keyRegionLocation = hrl.toString(); 1288 } 1289 } 1290 LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) + 1291 ", refPrevEqualsKey=" + 1292 (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) + 1293 ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) + 1294 ", ref row date=" + ts + ", jobStr=" + jobStr + 1295 ", ref row count=" + count + 1296 ", ref row regionLocation=" + refRegionLocation + 1297 ", key row regionLocation=" + keyRegionLocation); 1298 refsSb.append(comma); 1299 comma = ","; 1300 refsSb.append(Bytes.toStringBinary(ref)); 1301 } 1302 } 1303 return refsSb; 1304 } 1305 } 1306 1307 @Override 1308 public int run(String[] args) throws Exception { 1309 if (args.length != 2) { 1310 System.out.println("Usage : " + Verify.class.getSimpleName() 1311 + " <output dir> <num reducers>"); 1312 return 0; 1313 } 1314 1315 String outputDir = args[0]; 1316 int numReducers = Integer.parseInt(args[1]); 1317 1318 return run(outputDir, numReducers); 1319 } 1320 1321 public int run(String outputDir, int numReducers) throws Exception { 1322 return run(new Path(outputDir), numReducers); 1323 } 1324 1325 public int run(Path outputDir, int numReducers) throws Exception { 1326 LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers); 1327 1328 job = Job.getInstance(getConf()); 1329 1330 job.setJobName("Link Verifier"); 1331 job.setNumReduceTasks(numReducers); 1332 job.setJarByClass(getClass()); 1333 1334 setJobScannerConf(job); 1335 1336 Scan scan = new Scan(); 1337 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1338 scan.setCaching(10000); 1339 scan.setCacheBlocks(false); 1340 if (isMultiUnevenColumnFamilies(getConf())) { 1341 scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME); 1342 scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME); 1343 } 1344 1345 TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan, 1346 VerifyMapper.class, BytesWritable.class, BytesWritable.class, job); 1347 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 1348 AbstractHBaseTool.class); 1349 1350 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 1351 1352 job.setReducerClass(VerifyReducer.class); 1353 job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class); 1354 job.setOutputKeyClass(BytesWritable.class); 1355 job.setOutputValueClass(BytesWritable.class); 1356 TextOutputFormat.setOutputPath(job, outputDir); 1357 1358 boolean success = job.waitForCompletion(true); 1359 1360 if (success) { 1361 Counters counters = job.getCounters(); 1362 if (null == counters) { 1363 LOG.warn("Counters were null, cannot verify Job completion." 1364 + " This is commonly a result of insufficient YARN configuration."); 1365 // We don't have access to the counters to know if we have "bad" counts 1366 return 0; 1367 } 1368 1369 // If we find no unexpected values, the job didn't outright fail 1370 if (verifyUnexpectedValues(counters)) { 1371 // We didn't check referenced+unreferenced counts, leave that to visual inspection 1372 return 0; 1373 } 1374 } 1375 1376 // We failed 1377 return 1; 1378 } 1379 1380 public boolean verify(long expectedReferenced) throws Exception { 1381 if (job == null) { 1382 throw new IllegalStateException("You should call run() first"); 1383 } 1384 1385 Counters counters = job.getCounters(); 1386 if (counters == null) { 1387 LOG.info("Counters object was null, write verification cannot be performed." 1388 + " This is commonly a result of insufficient YARN configuration."); 1389 return false; 1390 } 1391 1392 // Run through each check, even if we fail one early 1393 boolean success = verifyExpectedValues(expectedReferenced, counters); 1394 1395 if (!verifyUnexpectedValues(counters)) { 1396 // We found counter objects which imply failure 1397 success = false; 1398 } 1399 1400 if (!success) { 1401 handleFailure(counters); 1402 } 1403 return success; 1404 } 1405 1406 /** 1407 * Verify the values in the Counters against the expected number of entries written. 1408 * 1409 * @param expectedReferenced 1410 * Expected number of referenced entrires 1411 * @param counters 1412 * The Job's Counters object 1413 * @return True if the values match what's expected, false otherwise 1414 */ 1415 protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) { 1416 final Counter referenced = counters.findCounter(Counts.REFERENCED); 1417 final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED); 1418 boolean success = true; 1419 1420 if (expectedReferenced != referenced.getValue()) { 1421 LOG.error("Expected referenced count does not match with actual referenced count. " + 1422 "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue()); 1423 success = false; 1424 } 1425 1426 if (unreferenced.getValue() > 0) { 1427 final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES); 1428 boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue()); 1429 LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue() 1430 + (couldBeMultiRef ? "; could be due to duplicate random numbers" : "")); 1431 success = false; 1432 } 1433 1434 return success; 1435 } 1436 1437 /** 1438 * Verify that the Counters don't contain values which indicate an outright failure from the Reducers. 1439 * 1440 * @param counters 1441 * The Job's counters 1442 * @return True if the "bad" counter objects are 0, false otherwise 1443 */ 1444 protected boolean verifyUnexpectedValues(Counters counters) { 1445 final Counter undefined = counters.findCounter(Counts.UNDEFINED); 1446 final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES); 1447 boolean success = true; 1448 1449 if (undefined.getValue() > 0) { 1450 LOG.error("Found an undefined node. Undefined count=" + undefined.getValue()); 1451 success = false; 1452 } 1453 1454 if (lostfamilies.getValue() > 0) { 1455 LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue()); 1456 success = false; 1457 } 1458 1459 return success; 1460 } 1461 1462 protected void handleFailure(Counters counters) throws IOException { 1463 Configuration conf = job.getConfiguration(); 1464 TableName tableName = getTableName(conf); 1465 try (Connection conn = ConnectionFactory.createConnection(conf)) { 1466 try (RegionLocator rl = conn.getRegionLocator(tableName)) { 1467 CounterGroup g = counters.getGroup("undef"); 1468 Iterator<Counter> it = g.iterator(); 1469 while (it.hasNext()) { 1470 String keyString = it.next().getName(); 1471 byte[] key = Bytes.toBytes(keyString); 1472 HRegionLocation loc = rl.getRegionLocation(key, true); 1473 LOG.error("undefined row " + keyString + ", " + loc); 1474 } 1475 g = counters.getGroup("unref"); 1476 it = g.iterator(); 1477 while (it.hasNext()) { 1478 String keyString = it.next().getName(); 1479 byte[] key = Bytes.toBytes(keyString); 1480 HRegionLocation loc = rl.getRegionLocation(key, true); 1481 LOG.error("unreferred row " + keyString + ", " + loc); 1482 } 1483 } 1484 } 1485 } 1486 } 1487 1488 /** 1489 * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration 1490 * adds more data. 1491 */ 1492 static class Loop extends Configured implements Tool { 1493 1494 private static final Logger LOG = LoggerFactory.getLogger(Loop.class); 1495 private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " + 1496 "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" + 1497 " <num walker threads>] \n" + 1498 "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" + 1499 "walkers will select and verify random flushed loop during Generation."; 1500 1501 IntegrationTestBigLinkedList it; 1502 1503 protected void runGenerator(int numMappers, long numNodes, 1504 String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers) 1505 throws Exception { 1506 Path outputPath = new Path(outputDir); 1507 UUID uuid = UUID.randomUUID(); //create a random UUID. 1508 Path generatorOutput = new Path(outputPath, uuid.toString()); 1509 1510 Generator generator = new Generator(); 1511 generator.setConf(getConf()); 1512 int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 1513 numWalkers); 1514 if (retCode > 0) { 1515 throw new RuntimeException("Generator failed with return code: " + retCode); 1516 } 1517 if (numWalkers > 0) { 1518 if (!generator.verify()) { 1519 throw new RuntimeException("Generator.verify failed"); 1520 } 1521 } 1522 } 1523 1524 protected void runVerify(String outputDir, 1525 int numReducers, long expectedNumNodes) throws Exception { 1526 Path outputPath = new Path(outputDir); 1527 UUID uuid = UUID.randomUUID(); //create a random UUID. 1528 Path iterationOutput = new Path(outputPath, uuid.toString()); 1529 1530 Verify verify = new Verify(); 1531 verify.setConf(getConf()); 1532 int retCode = verify.run(iterationOutput, numReducers); 1533 if (retCode > 0) { 1534 throw new RuntimeException("Verify.run failed with return code: " + retCode); 1535 } 1536 1537 if (!verify.verify(expectedNumNodes)) { 1538 throw new RuntimeException("Verify.verify failed"); 1539 } 1540 LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes); 1541 } 1542 1543 @Override 1544 public int run(String[] args) throws Exception { 1545 if (args.length < 5) { 1546 System.err.println(USAGE); 1547 return 1; 1548 } 1549 try { 1550 int numIterations = Integer.parseInt(args[0]); 1551 int numMappers = Integer.parseInt(args[1]); 1552 long numNodes = Long.parseLong(args[2]); 1553 String outputDir = args[3]; 1554 int numReducers = Integer.parseInt(args[4]); 1555 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); 1556 Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); 1557 Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]); 1558 1559 long expectedNumNodes = 0; 1560 1561 if (numIterations < 0) { 1562 numIterations = Integer.MAX_VALUE; //run indefinitely (kind of) 1563 } 1564 LOG.info("Running Loop with args:" + Arrays.deepToString(args)); 1565 for (int i = 0; i < numIterations; i++) { 1566 LOG.info("Starting iteration = " + i); 1567 runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers); 1568 expectedNumNodes += numMappers * numNodes; 1569 runVerify(outputDir, numReducers, expectedNumNodes); 1570 } 1571 return 0; 1572 } catch (NumberFormatException e) { 1573 System.err.println("Parsing loop arguments failed: " + e.getMessage()); 1574 System.err.println(USAGE); 1575 return 1; 1576 } 1577 } 1578 } 1579 1580 /** 1581 * A stand alone program that prints out portions of a list created by {@link Generator} 1582 */ 1583 private static class Print extends Configured implements Tool { 1584 @Override 1585 public int run(String[] args) throws Exception { 1586 Options options = new Options(); 1587 options.addOption("s", "start", true, "start key"); 1588 options.addOption("e", "end", true, "end key"); 1589 options.addOption("l", "limit", true, "number to print"); 1590 1591 GnuParser parser = new GnuParser(); 1592 CommandLine cmd = null; 1593 try { 1594 cmd = parser.parse(options, args); 1595 if (cmd.getArgs().length != 0) { 1596 throw new ParseException("Command takes no arguments"); 1597 } 1598 } catch (ParseException e) { 1599 System.err.println("Failed to parse command line " + e.getMessage()); 1600 System.err.println(); 1601 HelpFormatter formatter = new HelpFormatter(); 1602 formatter.printHelp(getClass().getSimpleName(), options); 1603 System.exit(-1); 1604 } 1605 1606 Connection connection = ConnectionFactory.createConnection(getConf()); 1607 Table table = connection.getTable(getTableName(getConf())); 1608 1609 Scan scan = new Scan(); 1610 scan.setBatch(10000); 1611 1612 if (cmd.hasOption("s")) 1613 scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s"))); 1614 1615 if (cmd.hasOption("e")) 1616 scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e"))); 1617 1618 int limit = 0; 1619 if (cmd.hasOption("l")) 1620 limit = Integer.parseInt(cmd.getOptionValue("l")); 1621 else 1622 limit = 100; 1623 1624 ResultScanner scanner = table.getScanner(scan); 1625 1626 CINode node = new CINode(); 1627 Result result = scanner.next(); 1628 int count = 0; 1629 while (result != null && count++ < limit) { 1630 node = getCINode(result, node); 1631 System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key), 1632 Bytes.toStringBinary(node.prev), node.count, node.client); 1633 result = scanner.next(); 1634 } 1635 scanner.close(); 1636 table.close(); 1637 connection.close(); 1638 1639 return 0; 1640 } 1641 } 1642 1643 /** 1644 * A stand alone program that deletes a single node. 1645 */ 1646 private static class Delete extends Configured implements Tool { 1647 @Override 1648 public int run(String[] args) throws Exception { 1649 if (args.length != 1) { 1650 System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>"); 1651 return 0; 1652 } 1653 byte[] val = Bytes.toBytesBinary(args[0]); 1654 1655 org.apache.hadoop.hbase.client.Delete delete 1656 = new org.apache.hadoop.hbase.client.Delete(val); 1657 1658 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1659 Table table = connection.getTable(getTableName(getConf()))) { 1660 table.delete(delete); 1661 } 1662 1663 System.out.println("Delete successful"); 1664 return 0; 1665 } 1666 } 1667 1668 abstract static class WalkerBase extends Configured{ 1669 protected static CINode findStartNode(Table table, byte[] startKey) throws IOException { 1670 Scan scan = new Scan(); 1671 scan.setStartRow(startKey); 1672 scan.setBatch(1); 1673 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1674 1675 long t1 = System.currentTimeMillis(); 1676 ResultScanner scanner = table.getScanner(scan); 1677 Result result = scanner.next(); 1678 long t2 = System.currentTimeMillis(); 1679 scanner.close(); 1680 1681 if ( result != null) { 1682 CINode node = getCINode(result, new CINode()); 1683 System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key)); 1684 return node; 1685 } 1686 1687 System.out.println("FSR " + (t2 - t1)); 1688 1689 return null; 1690 } 1691 protected CINode getNode(byte[] row, Table table, CINode node) throws IOException { 1692 Get get = new Get(row); 1693 get.addColumn(FAMILY_NAME, COLUMN_PREV); 1694 Result result = table.get(get); 1695 return getCINode(result, node); 1696 } 1697 } 1698 /** 1699 * A stand alone program that follows a linked list created by {@link Generator} and prints 1700 * timing info. 1701 */ 1702 private static class Walker extends WalkerBase implements Tool { 1703 1704 public Walker(){} 1705 1706 @Override 1707 public int run(String[] args) throws IOException { 1708 1709 Options options = new Options(); 1710 options.addOption("n", "num", true, "number of queries"); 1711 options.addOption("s", "start", true, "key to start at, binary string"); 1712 options.addOption("l", "logevery", true, "log every N queries"); 1713 1714 GnuParser parser = new GnuParser(); 1715 CommandLine cmd = null; 1716 try { 1717 cmd = parser.parse(options, args); 1718 if (cmd.getArgs().length != 0) { 1719 throw new ParseException("Command takes no arguments"); 1720 } 1721 } catch (ParseException e) { 1722 System.err.println("Failed to parse command line " + e.getMessage()); 1723 System.err.println(); 1724 HelpFormatter formatter = new HelpFormatter(); 1725 formatter.printHelp(getClass().getSimpleName(), options); 1726 System.exit(-1); 1727 } 1728 1729 long maxQueries = Long.MAX_VALUE; 1730 if (cmd.hasOption('n')) { 1731 maxQueries = Long.parseLong(cmd.getOptionValue("n")); 1732 } 1733 Random rand = new SecureRandom(); 1734 boolean isSpecificStart = cmd.hasOption('s'); 1735 1736 byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null; 1737 int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1; 1738 1739 Connection connection = ConnectionFactory.createConnection(getConf()); 1740 Table table = connection.getTable(getTableName(getConf())); 1741 long numQueries = 0; 1742 // If isSpecificStart is set, only walk one list from that particular node. 1743 // Note that in case of circular (or P-shaped) list it will walk forever, as is 1744 // the case in normal run without startKey. 1745 while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) { 1746 if (!isSpecificStart) { 1747 startKey = new byte[ROWKEY_LENGTH]; 1748 rand.nextBytes(startKey); 1749 } 1750 CINode node = findStartNode(table, startKey); 1751 if (node == null && isSpecificStart) { 1752 System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey)); 1753 } 1754 numQueries++; 1755 while (node != null && node.prev.length != NO_KEY.length && 1756 numQueries < maxQueries) { 1757 byte[] prev = node.prev; 1758 long t1 = System.currentTimeMillis(); 1759 node = getNode(prev, table, node); 1760 long t2 = System.currentTimeMillis(); 1761 if (logEvery > 0 && numQueries % logEvery == 0) { 1762 System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev)); 1763 } 1764 numQueries++; 1765 if (node == null) { 1766 System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev)); 1767 } else if (node.prev.length == NO_KEY.length) { 1768 System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key)); 1769 } 1770 } 1771 } 1772 table.close(); 1773 connection.close(); 1774 return 0; 1775 } 1776 } 1777 1778 private static class Clean extends Configured implements Tool { 1779 @Override public int run(String[] args) throws Exception { 1780 if (args.length < 1) { 1781 System.err.println("Usage: Clean <output dir>"); 1782 return -1; 1783 } 1784 1785 Path p = new Path(args[0]); 1786 Configuration conf = getConf(); 1787 TableName tableName = getTableName(conf); 1788 try (FileSystem fs = HFileSystem.get(conf); 1789 Connection conn = ConnectionFactory.createConnection(conf); 1790 Admin admin = conn.getAdmin()) { 1791 if (admin.tableExists(tableName)) { 1792 admin.disableTable(tableName); 1793 admin.deleteTable(tableName); 1794 } 1795 1796 if (fs.exists(p)) { 1797 fs.delete(p, true); 1798 } 1799 } 1800 1801 return 0; 1802 } 1803 } 1804 1805 static TableName getTableName(Configuration conf) { 1806 return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1807 } 1808 1809 private static CINode getCINode(Result result, CINode node) { 1810 node.key = Bytes.copy(result.getRow()); 1811 if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) { 1812 node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV)); 1813 } else { 1814 node.prev = NO_KEY; 1815 } 1816 if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) { 1817 node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT)); 1818 } else { 1819 node.count = -1; 1820 } 1821 if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) { 1822 node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT)); 1823 } else { 1824 node.client = ""; 1825 } 1826 return node; 1827 } 1828 1829 protected IntegrationTestingUtility util; 1830 1831 @Override 1832 public void setUpCluster() throws Exception { 1833 util = getTestingUtil(getConf()); 1834 boolean isDistributed = util.isDistributedCluster(); 1835 util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE); 1836 if (!isDistributed) { 1837 util.startMiniMapReduceCluster(); 1838 } 1839 this.setConf(util.getConfiguration()); 1840 } 1841 1842 @Override 1843 public void cleanUpCluster() throws Exception { 1844 super.cleanUpCluster(); 1845 if (util.isDistributedCluster()) { 1846 util.shutdownMiniMapReduceCluster(); 1847 } 1848 } 1849 1850 private static boolean isMultiUnevenColumnFamilies(Configuration conf) { 1851 return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,true); 1852 } 1853 1854 @Test 1855 public void testContinuousIngest() throws IOException, Exception { 1856 //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> 1857 Configuration conf = getTestingUtil(getConf()).getConfiguration(); 1858 if (isMultiUnevenColumnFamilies(getConf())) { 1859 // make sure per CF flush is on 1860 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 1861 } 1862 int ret = 1863 ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000", 1864 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" }); 1865 org.junit.Assert.assertEquals(0, ret); 1866 } 1867 1868 private void usage() { 1869 System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]"); 1870 printCommands(); 1871 } 1872 1873 private void printCommands() { 1874 System.err.println("Commands:"); 1875 System.err.println(" generator Map only job that generates data."); 1876 System.err.println(" verify A map reduce job that looks for holes. Check return code and"); 1877 System.err.println(" look at the counts after running. See REFERENCED and"); 1878 System.err.println(" UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run"); 1879 System.err.println(" with the Generator."); 1880 System.err.println(" walker " + 1881 "Standalone program that starts following a linked list & emits timing info."); 1882 System.err.println(" print Standalone program that prints nodes in the linked list."); 1883 System.err.println(" delete Standalone program that deletes a·single node."); 1884 System.err.println(" loop Program to Loop through Generator and Verify steps"); 1885 System.err.println(" clean Program to clean all left over detritus."); 1886 System.err.println(" search Search for missing keys."); 1887 System.err.println(""); 1888 System.err.println("General options:"); 1889 System.err.println(" -D"+ TABLE_NAME_KEY+ "=<tableName>"); 1890 System.err.println(" Run using the <tableName> as the tablename. Defaults to " 1891 + DEFAULT_TABLE_NAME); 1892 System.err.println(" -D"+ HBaseTestingUtility.REGIONS_PER_SERVER_KEY+ "=<# regions>"); 1893 System.err.println(" Create table with presplit regions per server. Defaults to " 1894 + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER); 1895 1896 System.err.println(" -DuseMob=<true|false>"); 1897 System.err.println(" Create table so that the mob read/write path is forced. " + 1898 "Defaults to false"); 1899 1900 System.err.flush(); 1901 } 1902 1903 @Override 1904 protected void processOptions(CommandLine cmd) { 1905 super.processOptions(cmd); 1906 String[] args = cmd.getArgs(); 1907 //get the class, run with the conf 1908 if (args.length < 1) { 1909 printUsage(this.getClass().getSimpleName() + 1910 " <general options> COMMAND [<COMMAND options>]", "General options:", ""); 1911 printCommands(); 1912 // Have to throw an exception here to stop the processing. Looks ugly but gets message across. 1913 throw new RuntimeException("Incorrect Number of args."); 1914 } 1915 toRun = args[0]; 1916 otherArgs = Arrays.copyOfRange(args, 1, args.length); 1917 } 1918 1919 @Override 1920 public int runTestFromCommandLine() throws Exception { 1921 Tool tool = null; 1922 if (toRun.equalsIgnoreCase("Generator")) { 1923 tool = new Generator(); 1924 } else if (toRun.equalsIgnoreCase("Verify")) { 1925 tool = new Verify(); 1926 } else if (toRun.equalsIgnoreCase("Loop")) { 1927 Loop loop = new Loop(); 1928 loop.it = this; 1929 tool = loop; 1930 } else if (toRun.equalsIgnoreCase("Walker")) { 1931 tool = new Walker(); 1932 } else if (toRun.equalsIgnoreCase("Print")) { 1933 tool = new Print(); 1934 } else if (toRun.equalsIgnoreCase("Delete")) { 1935 tool = new Delete(); 1936 } else if (toRun.equalsIgnoreCase("Clean")) { 1937 tool = new Clean(); 1938 } else if (toRun.equalsIgnoreCase("Search")) { 1939 tool = new Search(); 1940 } else { 1941 usage(); 1942 throw new RuntimeException("Unknown arg"); 1943 } 1944 1945 return ToolRunner.run(getConf(), tool, otherArgs); 1946 } 1947 1948 @Override 1949 public TableName getTablename() { 1950 Configuration c = getConf(); 1951 return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1952 } 1953 1954 @Override 1955 protected Set<String> getColumnFamilies() { 1956 if (isMultiUnevenColumnFamilies(getConf())) { 1957 return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME), 1958 Bytes.toString(TINY_FAMILY_NAME)); 1959 } else { 1960 return Sets.newHashSet(Bytes.toString(FAMILY_NAME)); 1961 } 1962 } 1963 1964 private static void setJobConf(Job job, int numMappers, long numNodes, 1965 Integer width, Integer wrapMultiplier, Integer numWalkers) { 1966 job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); 1967 job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes); 1968 if (width != null) { 1969 job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width); 1970 } 1971 if (wrapMultiplier != null) { 1972 job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier); 1973 } 1974 if (numWalkers != null) { 1975 job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers); 1976 } 1977 } 1978 1979 public static void setJobScannerConf(Job job) { 1980 // Make sure scanners log something useful to make debugging possible. 1981 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true); 1982 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000); 1983 } 1984 1985 public static void main(String[] args) throws Exception { 1986 Configuration conf = HBaseConfiguration.create(); 1987 IntegrationTestingUtility.setUseDistributedCluster(conf); 1988 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args); 1989 System.exit(ret); 1990 } 1991}