001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.test; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.security.SecureRandom; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.EnumSet; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Random; 033import java.util.Set; 034import java.util.SortedSet; 035import java.util.TreeSet; 036import java.util.UUID; 037import java.util.concurrent.ThreadLocalRandom; 038import java.util.concurrent.atomic.AtomicInteger; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.conf.Configured; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.LocatedFileStatus; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.fs.RemoteIterator; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.ClusterMetrics.Option; 047import org.apache.hadoop.hbase.HBaseConfiguration; 048import org.apache.hadoop.hbase.HBaseTestingUtility; 049import org.apache.hadoop.hbase.HColumnDescriptor; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.HRegionLocation; 052import org.apache.hadoop.hbase.HTableDescriptor; 053import org.apache.hadoop.hbase.IntegrationTestBase; 054import org.apache.hadoop.hbase.IntegrationTestingUtility; 055import org.apache.hadoop.hbase.MasterNotRunningException; 056import org.apache.hadoop.hbase.TableName; 057import org.apache.hadoop.hbase.client.Admin; 058import org.apache.hadoop.hbase.client.BufferedMutator; 059import org.apache.hadoop.hbase.client.BufferedMutatorParams; 060import org.apache.hadoop.hbase.client.Connection; 061import org.apache.hadoop.hbase.client.ConnectionFactory; 062import org.apache.hadoop.hbase.client.Get; 063import org.apache.hadoop.hbase.client.Mutation; 064import org.apache.hadoop.hbase.client.Put; 065import org.apache.hadoop.hbase.client.RegionLocator; 066import org.apache.hadoop.hbase.client.Result; 067import org.apache.hadoop.hbase.client.ResultScanner; 068import org.apache.hadoop.hbase.client.Scan; 069import org.apache.hadoop.hbase.client.ScannerCallable; 070import org.apache.hadoop.hbase.client.Table; 071import org.apache.hadoop.hbase.fs.HFileSystem; 072import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 073import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 074import org.apache.hadoop.hbase.mapreduce.TableMapper; 075import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; 076import org.apache.hadoop.hbase.mapreduce.WALPlayer; 077import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy; 078import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; 079import org.apache.hadoop.hbase.testclassification.IntegrationTests; 080import org.apache.hadoop.hbase.util.AbstractHBaseTool; 081import org.apache.hadoop.hbase.util.Bytes; 082import org.apache.hadoop.hbase.util.CommonFSUtils; 083import org.apache.hadoop.hbase.util.RegionSplitter; 084import org.apache.hadoop.hbase.wal.WALEdit; 085import org.apache.hadoop.hbase.wal.WALKey; 086import org.apache.hadoop.io.BytesWritable; 087import org.apache.hadoop.io.NullWritable; 088import org.apache.hadoop.io.Writable; 089import org.apache.hadoop.mapreduce.Counter; 090import org.apache.hadoop.mapreduce.CounterGroup; 091import org.apache.hadoop.mapreduce.Counters; 092import org.apache.hadoop.mapreduce.InputFormat; 093import org.apache.hadoop.mapreduce.InputSplit; 094import org.apache.hadoop.mapreduce.Job; 095import org.apache.hadoop.mapreduce.JobContext; 096import org.apache.hadoop.mapreduce.Mapper; 097import org.apache.hadoop.mapreduce.RecordReader; 098import org.apache.hadoop.mapreduce.Reducer; 099import org.apache.hadoop.mapreduce.TaskAttemptContext; 100import org.apache.hadoop.mapreduce.TaskAttemptID; 101import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 102import org.apache.hadoop.mapreduce.lib.input.FileSplit; 103import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat; 104import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 105import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 106import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 107import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; 108import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 109import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 110import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; 111import org.apache.hadoop.util.Tool; 112import org.apache.hadoop.util.ToolRunner; 113import org.junit.Test; 114import org.junit.experimental.categories.Category; 115import org.slf4j.Logger; 116import org.slf4j.LoggerFactory; 117import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 118import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 119import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 120import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 121import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 122import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 123 124/** 125 * This is an integration test borrowed from goraci, written by Keith Turner, 126 * which is in turn inspired by the Accumulo test called continous ingest (ci). 127 * The original source code can be found here: 128 * https://github.com/keith-turner/goraci 129 * https://github.com/enis/goraci/ 130 * 131 * Apache Accumulo [0] has a simple test suite that verifies that data is not 132 * lost at scale. This test suite is called continuous ingest. This test runs 133 * many ingest clients that continually create linked lists containing 25 134 * million nodes. At some point the clients are stopped and a map reduce job is 135 * run to ensure no linked list has a hole. A hole indicates data was lost.·· 136 * 137 * The nodes in the linked list are random. This causes each linked list to 138 * spread across the table. Therefore if one part of a table loses data, then it 139 * will be detected by references in another part of the table. 140 * 141 * THE ANATOMY OF THE TEST 142 * 143 * Below is rough sketch of how data is written. For specific details look at 144 * the Generator code. 145 * 146 * 1 Write out 1 million nodes· 2 Flush the client· 3 Write out 1 million that 147 * reference previous million· 4 If this is the 25th set of 1 million nodes, 148 * then update 1st set of million to point to last· 5 goto 1 149 * 150 * The key is that nodes only reference flushed nodes. Therefore a node should 151 * never reference a missing node, even if the ingest client is killed at any 152 * point in time. 153 * 154 * When running this test suite w/ Accumulo there is a script running in 155 * parallel called the Aggitator that randomly and continuously kills server 156 * processes.·· The outcome was that many data loss bugs were found in Accumulo 157 * by doing this.· This test suite can also help find bugs that impact uptime 158 * and stability when· run for days or weeks.·· 159 * 160 * This test suite consists the following· - a few Java programs· - a little 161 * helper script to run the java programs - a maven script to build it.·· 162 * 163 * When generating data, its best to have each map task generate a multiple of 164 * 25 million. The reason for this is that circular linked list are generated 165 * every 25M. Not generating a multiple in 25M will result in some nodes in the 166 * linked list not having references. The loss of an unreferenced node can not 167 * be detected. 168 * 169 * 170 * Below is a description of the Java programs 171 * 172 * Generator - A map only job that generates data. As stated previously,·its best to generate data 173 * in multiples of 25M. An option is also available to allow concurrent walkers to select and walk 174 * random flushed loops during this phase. 175 * 176 * Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and 177 * UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same 178 * time as the Generator. 179 * 180 * Walker - A standalone program that start following a linked list· and emits timing info.·· 181 * 182 * Print - A standalone program that prints nodes in the linked list 183 * 184 * Delete - A standalone program that deletes a single node 185 * 186 * This class can be run as a unit test, as an integration test, or from the command line 187 * 188 * ex: 189 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 190 * loop 2 1 100000 /temp 1 1000 50 1 0 191 * 192 */ 193@Category(IntegrationTests.class) 194public class IntegrationTestBigLinkedList extends IntegrationTestBase { 195 protected static final byte[] NO_KEY = new byte[1]; 196 197 protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table"; 198 199 protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList"; 200 201 protected static byte[] FAMILY_NAME = Bytes.toBytes("meta"); 202 private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big"); 203 private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny"); 204 205 //link to the id of the prev node in the linked list 206 protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev"); 207 208 //identifier of the mapred task that generated this row 209 protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client"); 210 211 //the id of the row within the same client. 212 protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count"); 213 214 /** How many rows to write per map task. This has to be a multiple of 25M */ 215 private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY 216 = "IntegrationTestBigLinkedList.generator.num_rows"; 217 218 private static final String GENERATOR_NUM_MAPPERS_KEY 219 = "IntegrationTestBigLinkedList.generator.map.tasks"; 220 221 private static final String GENERATOR_WIDTH_KEY 222 = "IntegrationTestBigLinkedList.generator.width"; 223 224 private static final String GENERATOR_WRAP_KEY 225 = "IntegrationTestBigLinkedList.generator.wrap"; 226 227 private static final String CONCURRENT_WALKER_KEY 228 = "IntegrationTestBigLinkedList.generator.concurrentwalkers"; 229 230 protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster 231 232 private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters 233 234 private static final int WIDTH_DEFAULT = 1000000; 235 private static final int WRAP_DEFAULT = 25; 236 private static final int ROWKEY_LENGTH = 16; 237 238 private static final int CONCURRENT_WALKER_DEFAULT = 0; 239 240 protected String toRun; 241 protected String[] otherArgs; 242 243 static class CINode { 244 byte[] key; 245 byte[] prev; 246 String client; 247 long count; 248 } 249 250 /** 251 * A Map only job that generates random linked list and stores them. 252 */ 253 static class Generator extends Configured implements Tool { 254 255 private static final Logger LOG = LoggerFactory.getLogger(Generator.class); 256 257 /** 258 * Set this configuration if you want to test single-column family flush works. If set, we will 259 * add a big column family and a small column family on either side of the usual ITBLL 'meta' 260 * column family. When we write out the ITBLL, we will also add to the big column family a value 261 * bigger than that for ITBLL and for small, something way smaller. The idea is that when 262 * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any 263 * way. Here is how you would pass it: 264 * <p> 265 * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 266 * -Dgenerator.multiple.columnfamilies=true generator 1 10 g 267 */ 268 public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY = 269 "generator.multiple.columnfamilies"; 270 271 public static enum Counts { 272 SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION 273 } 274 275 public static final String USAGE = "Usage : " + Generator.class.getSimpleName() + 276 " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" + 277 " <num walker threads>] \n" + 278 "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" + 279 "walkers will verify random flushed loop during Generation."; 280 281 public Job job; 282 283 static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> { 284 static class GeneratorInputSplit extends InputSplit implements Writable { 285 @Override 286 public long getLength() throws IOException, InterruptedException { 287 return 1; 288 } 289 @Override 290 public String[] getLocations() throws IOException, InterruptedException { 291 return new String[0]; 292 } 293 @Override 294 public void readFields(DataInput arg0) throws IOException { 295 } 296 @Override 297 public void write(DataOutput arg0) throws IOException { 298 } 299 } 300 301 static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> { 302 private long count; 303 private long numNodes; 304 private Random rand; 305 306 @Override 307 public void close() throws IOException { 308 } 309 310 @Override 311 public BytesWritable getCurrentKey() throws IOException, InterruptedException { 312 byte[] bytes = new byte[ROWKEY_LENGTH]; 313 rand.nextBytes(bytes); 314 return new BytesWritable(bytes); 315 } 316 317 @Override 318 public NullWritable getCurrentValue() throws IOException, InterruptedException { 319 return NullWritable.get(); 320 } 321 322 @Override 323 public float getProgress() throws IOException, InterruptedException { 324 return (float)(count / (double)numNodes); 325 } 326 327 @Override 328 public void initialize(InputSplit arg0, TaskAttemptContext context) 329 throws IOException, InterruptedException { 330 numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000); 331 // Use SecureRandom to avoid issue described in HBASE-13382. 332 rand = new SecureRandom(); 333 } 334 335 @Override 336 public boolean nextKeyValue() throws IOException, InterruptedException { 337 return count++ < numNodes; 338 } 339 340 } 341 342 @Override 343 public RecordReader<BytesWritable,NullWritable> createRecordReader( 344 InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 345 GeneratorRecordReader rr = new GeneratorRecordReader(); 346 rr.initialize(split, context); 347 return rr; 348 } 349 350 @Override 351 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 352 int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1); 353 354 ArrayList<InputSplit> splits = new ArrayList<>(numMappers); 355 356 for (int i = 0; i < numMappers; i++) { 357 splits.add(new GeneratorInputSplit()); 358 } 359 360 return splits; 361 } 362 } 363 364 /** Ensure output files from prev-job go to map inputs for current job */ 365 static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> { 366 @Override 367 protected boolean isSplitable(JobContext context, Path filename) { 368 return false; 369 } 370 } 371 372 /** 373 * Some ASCII art time: 374 * <p> 375 * [ . . . ] represents one batch of random longs of length WIDTH 376 * <pre> 377 * _________________________ 378 * | ______ | 379 * | | || 380 * .-+-----------------+-----.|| 381 * | | | ||| 382 * first = [ . . . . . . . . . . . ] ||| 383 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 384 * | | | | | | | | | | | ||| 385 * prev = [ . . . . . . . . . . . ] ||| 386 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 387 * | | | | | | | | | | | ||| 388 * current = [ . . . . . . . . . . . ] ||| 389 * ||| 390 * ... ||| 391 * ||| 392 * last = [ . . . . . . . . . . . ] ||| 393 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____||| 394 * | |________|| 395 * |___________________________| 396 * </pre> 397 */ 398 399 static class GeneratorMapper 400 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 401 402 byte[][] first = null; 403 byte[][] prev = null; 404 byte[][] current = null; 405 byte[] id; 406 long count = 0; 407 int i; 408 BufferedMutator mutator; 409 Connection connection; 410 long numNodes; 411 long wrap; 412 int width; 413 boolean multipleUnevenColumnFamilies; 414 byte[] tinyValue = new byte[] { 't' }; 415 byte[] bigValue = null; 416 Configuration conf; 417 418 volatile boolean walkersStop; 419 int numWalkers; 420 volatile List<Long> flushedLoops = new ArrayList<>(); 421 List<Thread> walkers = new ArrayList<>(); 422 423 @Override 424 protected void setup(Context context) throws IOException, InterruptedException { 425 id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID()); 426 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 427 instantiateHTable(); 428 this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT); 429 current = new byte[this.width][]; 430 int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT); 431 this.wrap = (long)wrapMultiplier * width; 432 this.numNodes = context.getConfiguration().getLong( 433 GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT); 434 if (this.numNodes < this.wrap) { 435 this.wrap = this.numNodes; 436 } 437 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 438 this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT); 439 this.walkersStop = false; 440 this.conf = context.getConfiguration(); 441 } 442 443 protected void instantiateHTable() throws IOException { 444 mutator = connection.getBufferedMutator( 445 new BufferedMutatorParams(getTableName(connection.getConfiguration())) 446 .writeBufferSize(4 * 1024 * 1024)); 447 } 448 449 @Override 450 protected void cleanup(Context context) throws IOException ,InterruptedException { 451 joinWalkers(); 452 mutator.close(); 453 connection.close(); 454 } 455 456 @Override 457 protected void map(BytesWritable key, NullWritable value, Context output) throws IOException { 458 current[i] = new byte[key.getLength()]; 459 System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength()); 460 if (++i == current.length) { 461 LOG.info("Persisting current.length=" + current.length + ", count=" + count + ", id=" + 462 Bytes.toStringBinary(id) + ", current=" + Bytes.toStringBinary(current[0]) + 463 ", i=" + i); 464 persist(output, count, prev, current, id); 465 i = 0; 466 467 if (first == null) { 468 first = current; 469 } 470 prev = current; 471 current = new byte[this.width][]; 472 473 count += current.length; 474 output.setStatus("Count " + count); 475 476 if (count % wrap == 0) { 477 // this block of code turns the 1 million linked list of length 25 into one giant 478 //circular linked list of 25 million 479 circularLeftShift(first); 480 persist(output, -1, prev, first, null); 481 // At this point the entire loop has been flushed so we can add one of its nodes to the 482 // concurrent walker 483 if (numWalkers > 0) { 484 addFlushed(key.getBytes()); 485 if (walkers.isEmpty()) { 486 startWalkers(numWalkers, conf, output); 487 } 488 } 489 first = null; 490 prev = null; 491 } 492 } 493 } 494 495 private static <T> void circularLeftShift(T[] first) { 496 T ez = first[0]; 497 System.arraycopy(first, 1, first, 0, first.length - 1); 498 first[first.length - 1] = ez; 499 } 500 501 private void addFlushed(byte[] rowKey) { 502 synchronized (flushedLoops) { 503 flushedLoops.add(Bytes.toLong(rowKey)); 504 flushedLoops.notifyAll(); 505 } 506 } 507 508 protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id) 509 throws IOException { 510 for (int i = 0; i < current.length; i++) { 511 512 if (i % 100 == 0) { 513 // Tickle progress every so often else maprunner will think us hung 514 output.progress(); 515 } 516 517 Put put = new Put(current[i]); 518 put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]); 519 520 if (count >= 0) { 521 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); 522 } 523 if (id != null) { 524 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); 525 } 526 // See if we are to write multiple columns. 527 if (this.multipleUnevenColumnFamilies) { 528 // Use any column name. 529 put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue); 530 // If we've not allocated bigValue, do it now. Reuse same value each time. 531 if (this.bigValue == null) { 532 this.bigValue = new byte[current[i].length * 10]; 533 ThreadLocalRandom.current().nextBytes(this.bigValue); 534 } 535 // Use any column name. 536 put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue); 537 } 538 mutator.mutate(put); 539 } 540 541 mutator.flush(); 542 } 543 544 private void startWalkers(int numWalkers, Configuration conf, Context context) { 545 LOG.info("Starting " + numWalkers + " concurrent walkers"); 546 for (int i = 0; i < numWalkers; i++) { 547 Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context)); 548 walker.start(); 549 walkers.add(walker); 550 } 551 } 552 553 private void joinWalkers() { 554 walkersStop = true; 555 synchronized (flushedLoops) { 556 flushedLoops.notifyAll(); 557 } 558 for (Thread walker : walkers) { 559 try { 560 walker.join(); 561 } catch (InterruptedException e) { 562 // no-op 563 } 564 } 565 } 566 567 /** 568 * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by 569 * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are 570 * configured to only log erroneous nodes. 571 */ 572 573 public class ContinuousConcurrentWalker implements Runnable { 574 575 ConcurrentWalker walker; 576 Configuration conf; 577 Context context; 578 Random rand; 579 580 public ContinuousConcurrentWalker(Configuration conf, Context context) { 581 this.conf = conf; 582 this.context = context; 583 rand = new Random(); 584 } 585 586 @Override 587 public void run() { 588 while (!walkersStop) { 589 try { 590 long node = selectLoop(); 591 try { 592 walkLoop(node); 593 } catch (IOException e) { 594 context.getCounter(Counts.IOEXCEPTION).increment(1l); 595 return; 596 } 597 } catch (InterruptedException e) { 598 return; 599 } 600 } 601 } 602 603 private void walkLoop(long node) throws IOException { 604 walker = new ConcurrentWalker(context); 605 walker.setConf(conf); 606 walker.run(node, wrap); 607 } 608 609 private long selectLoop () throws InterruptedException{ 610 synchronized (flushedLoops) { 611 while (flushedLoops.isEmpty() && !walkersStop) { 612 flushedLoops.wait(); 613 } 614 if (walkersStop) { 615 throw new InterruptedException(); 616 } 617 return flushedLoops.get(rand.nextInt(flushedLoops.size())); 618 } 619 } 620 } 621 622 public static class ConcurrentWalker extends WalkerBase { 623 624 Context context; 625 626 public ConcurrentWalker(Context context) {this.context = context;} 627 628 public void run(long startKeyIn, long maxQueriesIn) throws IOException { 629 630 long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE; 631 byte[] startKey = Bytes.toBytes(startKeyIn); 632 633 Connection connection = ConnectionFactory.createConnection(getConf()); 634 Table table = connection.getTable(getTableName(getConf())); 635 long numQueries = 0; 636 // If isSpecificStart is set, only walk one list from that particular node. 637 // Note that in case of circular (or P-shaped) list it will walk forever, as is 638 // the case in normal run without startKey. 639 640 CINode node = findStartNode(table, startKey); 641 if (node == null) { 642 LOG.error("Start node not found: " + Bytes.toStringBinary(startKey)); 643 throw new IOException("Start node not found: " + startKeyIn); 644 } 645 while (numQueries < maxQueries) { 646 numQueries++; 647 byte[] prev = node.prev; 648 long t1 = System.currentTimeMillis(); 649 node = getNode(prev, table, node); 650 long t2 = System.currentTimeMillis(); 651 if (node == null) { 652 LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev)); 653 context.getCounter(Counts.UNDEFINED).increment(1l); 654 } else if (node.prev.length == NO_KEY.length) { 655 LOG.error("ConcurrentWalker found TERMINATING NODE: " + 656 Bytes.toStringBinary(node.key)); 657 context.getCounter(Counts.TERMINATING).increment(1l); 658 } else { 659 // Increment for successful walk 660 context.getCounter(Counts.SUCCESS).increment(1l); 661 } 662 } 663 table.close(); 664 connection.close(); 665 } 666 } 667 } 668 669 @Override 670 public int run(String[] args) throws Exception { 671 if (args.length < 3) { 672 System.err.println(USAGE); 673 return 1; 674 } 675 try { 676 int numMappers = Integer.parseInt(args[0]); 677 long numNodes = Long.parseLong(args[1]); 678 Path tmpOutput = new Path(args[2]); 679 Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]); 680 Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]); 681 Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]); 682 return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 683 } catch (NumberFormatException e) { 684 System.err.println("Parsing generator arguments failed: " + e.getMessage()); 685 System.err.println(USAGE); 686 return 1; 687 } 688 } 689 690 protected void createSchema() throws IOException { 691 Configuration conf = getConf(); 692 TableName tableName = getTableName(conf); 693 try (Connection conn = ConnectionFactory.createConnection(conf); 694 Admin admin = conn.getAdmin()) { 695 if (!admin.tableExists(tableName)) { 696 HTableDescriptor htd = new HTableDescriptor(getTableName(getConf())); 697 htd.addFamily(new HColumnDescriptor(FAMILY_NAME)); 698 // Always add these families. Just skip writing to them when we do not test per CF flush. 699 htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME)); 700 htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME)); 701 // if -DuseMob=true force all data through mob path. 702 if (conf.getBoolean("useMob", false)) { 703 for (HColumnDescriptor hcd : htd.getColumnFamilies() ) { 704 hcd.setMobEnabled(true); 705 hcd.setMobThreshold(4); 706 } 707 } 708 709 // If we want to pre-split compute how many splits. 710 if (conf.getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY, 711 HBaseTestingUtility.PRESPLIT_TEST_TABLE)) { 712 int numberOfServers = 713 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 714 .getLiveServerMetrics().size(); 715 if (numberOfServers == 0) { 716 throw new IllegalStateException("No live regionservers"); 717 } 718 int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY, 719 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER); 720 int totalNumberOfRegions = numberOfServers * regionsPerServer; 721 LOG.info("Number of live regionservers: " + numberOfServers + ", " + 722 "pre-splitting table into " + totalNumberOfRegions + " regions " + 723 "(default regions per server: " + regionsPerServer + ")"); 724 725 726 byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions); 727 728 admin.createTable(htd, splits); 729 } else { 730 // Looks like we're just letting things play out. 731 // Create a table with on region by default. 732 // This will make the splitting work hard. 733 admin.createTable(htd); 734 } 735 } 736 } catch (MasterNotRunningException e) { 737 LOG.error("Master not running", e); 738 throw new IOException(e); 739 } 740 } 741 742 public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, 743 Integer width, Integer wrapMultiplier, Integer numWalkers) 744 throws Exception { 745 LOG.info("Running RandomInputGenerator with numMappers=" + numMappers 746 + ", numNodes=" + numNodes); 747 Job job = Job.getInstance(getConf()); 748 749 job.setJobName("Random Input Generator"); 750 job.setNumReduceTasks(0); 751 job.setJarByClass(getClass()); 752 753 job.setInputFormatClass(GeneratorInputFormat.class); 754 job.setOutputKeyClass(BytesWritable.class); 755 job.setOutputValueClass(NullWritable.class); 756 757 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 758 759 job.setMapperClass(Mapper.class); //identity mapper 760 761 FileOutputFormat.setOutputPath(job, tmpOutput); 762 job.setOutputFormatClass(SequenceFileOutputFormat.class); 763 764 boolean success = jobCompletion(job); 765 766 return success ? 0 : 1; 767 } 768 769 public int runGenerator(int numMappers, long numNodes, Path tmpOutput, 770 Integer width, Integer wrapMultiplier, Integer numWalkers) 771 throws Exception { 772 LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes); 773 createSchema(); 774 job = Job.getInstance(getConf()); 775 776 job.setJobName("Link Generator"); 777 job.setNumReduceTasks(0); 778 job.setJarByClass(getClass()); 779 780 FileInputFormat.setInputPaths(job, tmpOutput); 781 job.setInputFormatClass(OneFilePerMapperSFIF.class); 782 job.setOutputKeyClass(NullWritable.class); 783 job.setOutputValueClass(NullWritable.class); 784 785 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 786 787 setMapperForGenerator(job); 788 789 job.setOutputFormatClass(NullOutputFormat.class); 790 791 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 792 TableMapReduceUtil.addDependencyJars(job); 793 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 794 AbstractHBaseTool.class); 795 TableMapReduceUtil.initCredentials(job); 796 797 boolean success = jobCompletion(job); 798 799 return success ? 0 : 1; 800 } 801 802 protected boolean jobCompletion(Job job) throws IOException, InterruptedException, 803 ClassNotFoundException { 804 boolean success = job.waitForCompletion(true); 805 return success; 806 } 807 808 protected void setMapperForGenerator(Job job) { 809 job.setMapperClass(GeneratorMapper.class); 810 } 811 812 public int run(int numMappers, long numNodes, Path tmpOutput, 813 Integer width, Integer wrapMultiplier, Integer numWalkers) 814 throws Exception { 815 int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, 816 numWalkers); 817 if (ret > 0) { 818 return ret; 819 } 820 return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 821 } 822 823 public boolean verify() { 824 try { 825 Counters counters = job.getCounters(); 826 if (counters == null) { 827 LOG.info("Counters object was null, Generator verification cannot be performed." 828 + " This is commonly a result of insufficient YARN configuration."); 829 return false; 830 } 831 832 if (counters.findCounter(Counts.TERMINATING).getValue() > 0 || 833 counters.findCounter(Counts.UNDEFINED).getValue() > 0 || 834 counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) { 835 LOG.error("Concurrent walker failed to verify during Generation phase"); 836 LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue()); 837 LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue()); 838 LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue()); 839 return false; 840 } 841 } catch (IOException e) { 842 LOG.info("Generator verification could not find counter"); 843 return false; 844 } 845 return true; 846 } 847 } 848 849 /** 850 * Tool to search missing rows in WALs and hfiles. 851 * Pass in file or dir of keys to search for. Key file must have been written by Verify step 852 * (we depend on the format it writes out. We'll read them in and then search in hbase 853 * WALs and oldWALs dirs (Some of this is TODO). 854 */ 855 static class Search extends Configured implements Tool { 856 private static final Logger LOG = LoggerFactory.getLogger(Search.class); 857 protected Job job; 858 859 private static void printUsage(final String error) { 860 if (error != null && error.length() > 0) System.out.println("ERROR: " + error); 861 System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]"); 862 } 863 864 @Override 865 public int run(String[] args) throws Exception { 866 if (args.length < 1 || args.length > 2) { 867 printUsage(null); 868 return 1; 869 } 870 Path inputDir = new Path(args[0]); 871 int numMappers = 1; 872 if (args.length > 1) { 873 numMappers = Integer.parseInt(args[1]); 874 } 875 return run(inputDir, numMappers); 876 } 877 878 /** 879 * WALPlayer override that searches for keys loaded in the setup. 880 */ 881 public static class WALSearcher extends WALPlayer { 882 public WALSearcher(Configuration conf) { 883 super(conf); 884 } 885 886 /** 887 * The actual searcher mapper. 888 */ 889 public static class WALMapperSearcher extends WALMapper { 890 private SortedSet<byte []> keysToFind; 891 private AtomicInteger rows = new AtomicInteger(0); 892 893 @Override 894 public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) 895 throws IOException { 896 super.setup(context); 897 try { 898 this.keysToFind = readKeysToSearch(context.getConfiguration()); 899 LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); 900 } catch (InterruptedException e) { 901 throw new InterruptedIOException(e.toString()); 902 } 903 } 904 905 @Override 906 protected boolean filter(Context context, Cell cell) { 907 // TODO: Can I do a better compare than this copying out key? 908 byte [] row = new byte [cell.getRowLength()]; 909 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); 910 boolean b = this.keysToFind.contains(row); 911 if (b) { 912 String keyStr = Bytes.toStringBinary(row); 913 try { 914 LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); 915 } catch (IOException|InterruptedException e) { 916 LOG.warn(e.toString(), e); 917 } 918 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 919 context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); 920 } 921 context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); 922 } 923 return b; 924 } 925 } 926 927 // Put in place the above WALMapperSearcher. 928 @Override 929 public Job createSubmittableJob(String[] args) throws IOException { 930 Job job = super.createSubmittableJob(args); 931 // Call my class instead. 932 job.setJarByClass(WALMapperSearcher.class); 933 job.setMapperClass(WALMapperSearcher.class); 934 job.setOutputFormatClass(NullOutputFormat.class); 935 return job; 936 } 937 } 938 939 static final String FOUND_GROUP_KEY = "Found"; 940 static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; 941 942 public int run(Path inputDir, int numMappers) throws Exception { 943 getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); 944 SortedSet<byte []> keys = readKeysToSearch(getConf()); 945 if (keys.isEmpty()) throw new RuntimeException("No keys to find"); 946 LOG.info("Count of keys to find: " + keys.size()); 947 for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); 948 // Now read all WALs. In two dirs. Presumes certain layout. 949 Path walsDir = new Path( 950 CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME); 951 Path oldWalsDir = new Path( 952 CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME); 953 LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers + 954 " against " + getConf().get(HConstants.HBASE_DIR)); 955 int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()), 956 new String [] {walsDir.toString(), ""}); 957 if (ret != 0) { 958 return ret; 959 } 960 return ToolRunner.run(getConf(), new WALSearcher(getConf()), 961 new String [] {oldWalsDir.toString(), ""}); 962 } 963 964 static SortedSet<byte []> readKeysToSearch(final Configuration conf) 965 throws IOException, InterruptedException { 966 Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); 967 FileSystem fs = FileSystem.get(conf); 968 SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 969 if (!fs.exists(keysInputDir)) { 970 throw new FileNotFoundException(keysInputDir.toString()); 971 } 972 if (!fs.isDirectory(keysInputDir)) { 973 throw new UnsupportedOperationException("TODO"); 974 } else { 975 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false); 976 while(iterator.hasNext()) { 977 LocatedFileStatus keyFileStatus = iterator.next(); 978 // Skip "_SUCCESS" file. 979 if (keyFileStatus.getPath().getName().startsWith("_")) continue; 980 result.addAll(readFileToSearch(conf, fs, keyFileStatus)); 981 } 982 } 983 return result; 984 } 985 986 private static SortedSet<byte[]> readFileToSearch(final Configuration conf, 987 final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException, 988 InterruptedException { 989 SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 990 // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is 991 // what is missing. 992 TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); 993 try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr = 994 new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) { 995 InputSplit is = 996 new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {}); 997 rr.initialize(is, context); 998 while (rr.nextKeyValue()) { 999 rr.getCurrentKey(); 1000 BytesWritable bw = rr.getCurrentValue(); 1001 if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) { 1002 byte[] key = new byte[rr.getCurrentKey().getLength()]; 1003 System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey() 1004 .getLength()); 1005 result.add(key); 1006 } 1007 } 1008 } 1009 return result; 1010 } 1011 } 1012 1013 /** 1014 * A Map Reduce job that verifies that the linked lists generated by 1015 * {@link Generator} do not have any holes. 1016 */ 1017 static class Verify extends Configured implements Tool { 1018 1019 private static final Logger LOG = LoggerFactory.getLogger(Verify.class); 1020 protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 }); 1021 protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 }); 1022 1023 protected Job job; 1024 1025 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> { 1026 private BytesWritable row = new BytesWritable(); 1027 private BytesWritable ref = new BytesWritable(); 1028 1029 private boolean multipleUnevenColumnFamilies; 1030 1031 @Override 1032 protected void setup( 1033 Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context) 1034 throws IOException, InterruptedException { 1035 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 1036 } 1037 1038 @Override 1039 protected void map(ImmutableBytesWritable key, Result value, Context context) 1040 throws IOException ,InterruptedException { 1041 byte[] rowKey = key.get(); 1042 row.set(rowKey, 0, rowKey.length); 1043 if (multipleUnevenColumnFamilies 1044 && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn( 1045 TINY_FAMILY_NAME, TINY_FAMILY_NAME))) { 1046 context.write(row, DEF_LOST_FAMILIES); 1047 } else { 1048 context.write(row, DEF); 1049 } 1050 byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV); 1051 if (prev != null && prev.length > 0) { 1052 ref.set(prev, 0, prev.length); 1053 context.write(ref, row); 1054 } else { 1055 LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey))); 1056 } 1057 } 1058 } 1059 1060 /** 1061 * Don't change the order of these enums. Their ordinals are used as type flag when we emit 1062 * problems found from the reducer. 1063 */ 1064 public static enum Counts { 1065 UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES, 1066 LOST_FAMILIES 1067 } 1068 1069 /** 1070 * Per reducer, we output problem rows as byte arrasy so can be used as input for 1071 * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag 1072 * saying what sort of emission it is. Flag is the Count enum ordinal as a short. 1073 */ 1074 public static class VerifyReducer extends 1075 Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> { 1076 private ArrayList<byte[]> refs = new ArrayList<>(); 1077 private final BytesWritable UNREF = new BytesWritable(addPrefixFlag( 1078 Counts.UNREFERENCED.ordinal(), new byte[] {})); 1079 private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag( 1080 Counts.LOST_FAMILIES.ordinal(), new byte[] {})); 1081 1082 private AtomicInteger rows = new AtomicInteger(0); 1083 private Connection connection; 1084 1085 @Override 1086 protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1087 throws IOException, InterruptedException { 1088 super.setup(context); 1089 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 1090 } 1091 1092 @Override 1093 protected void cleanup( 1094 Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1095 throws IOException, InterruptedException { 1096 if (this.connection != null) { 1097 this.connection.close(); 1098 } 1099 super.cleanup(context); 1100 } 1101 1102 /** 1103 * @param ordinal 1104 * @param r 1105 * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up 1106 * Bytes.SIZEOF_SHORT bytes followed by <code>r</code> 1107 */ 1108 public static byte[] addPrefixFlag(final int ordinal, final byte [] r) { 1109 byte[] prefix = Bytes.toBytes((short)ordinal); 1110 if (prefix.length != Bytes.SIZEOF_SHORT) { 1111 throw new RuntimeException("Unexpected size: " + prefix.length); 1112 } 1113 byte[] result = new byte[prefix.length + r.length]; 1114 System.arraycopy(prefix, 0, result, 0, prefix.length); 1115 System.arraycopy(r, 0, result, prefix.length, r.length); 1116 return result; 1117 } 1118 1119 /** 1120 * @param bs 1121 * @return Type from the Counts enum of this row. Reads prefix added by 1122 * {@link #addPrefixFlag(int, byte[])} 1123 */ 1124 public static Counts whichType(final byte [] bs) { 1125 int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT); 1126 return Counts.values()[ordinal]; 1127 } 1128 1129 /** 1130 * @param bw 1131 * @return Row bytes minus the type flag. 1132 */ 1133 public static byte[] getRowOnly(BytesWritable bw) { 1134 byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT]; 1135 System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length); 1136 return bytes; 1137 } 1138 1139 @Override 1140 public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context) 1141 throws IOException, InterruptedException { 1142 int defCount = 0; 1143 boolean lostFamilies = false; 1144 refs.clear(); 1145 for (BytesWritable type : values) { 1146 if (type.getLength() == DEF.getLength()) { 1147 defCount++; 1148 if (type.getBytes()[0] == 1) { 1149 lostFamilies = true; 1150 } 1151 } else { 1152 byte[] bytes = new byte[type.getLength()]; 1153 System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength()); 1154 refs.add(bytes); 1155 } 1156 } 1157 1158 // TODO check for more than one def, should not happen 1159 StringBuilder refsSb = null; 1160 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1161 if (defCount == 0 || refs.size() != 1) { 1162 refsSb = dumpExtraInfoOnRefs(key, context, refs); 1163 LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" + 1164 (refsSb != null? refsSb.toString(): "")); 1165 } 1166 if (lostFamilies) { 1167 LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families"); 1168 context.getCounter(Counts.LOST_FAMILIES).increment(1); 1169 context.write(key, LOSTFAM); 1170 } 1171 1172 if (defCount == 0 && refs.size() > 0) { 1173 // This is bad, found a node that is referenced but not defined. It must have been 1174 // lost, emit some info about this node for debugging purposes. 1175 // Write out a line per reference. If more than one, flag it.; 1176 for (int i = 0; i < refs.size(); i++) { 1177 byte[] bs = refs.get(i); 1178 int ordinal; 1179 if (i <= 0) { 1180 ordinal = Counts.UNDEFINED.ordinal(); 1181 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1182 context.getCounter(Counts.UNDEFINED).increment(1); 1183 } else { 1184 ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal(); 1185 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1186 } 1187 } 1188 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1189 // Print out missing row; doing get on reference gives info on when the referencer 1190 // was added which can help a little debugging. This info is only available in mapper 1191 // output -- the 'Linked List error Key...' log message above. What we emit here is 1192 // useless for debugging. 1193 context.getCounter("undef", keyString).increment(1); 1194 } 1195 } else if (defCount > 0 && refs.isEmpty()) { 1196 // node is defined but not referenced 1197 context.write(key, UNREF); 1198 context.getCounter(Counts.UNREFERENCED).increment(1); 1199 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1200 context.getCounter("unref", keyString).increment(1); 1201 } 1202 } else { 1203 if (refs.size() > 1) { 1204 // Skip first reference. 1205 for (int i = 1; i < refs.size(); i++) { 1206 context.write(key, 1207 new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i)))); 1208 } 1209 context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1); 1210 } 1211 // node is defined and referenced 1212 context.getCounter(Counts.REFERENCED).increment(1); 1213 } 1214 } 1215 1216 /** 1217 * Dump out extra info around references if there are any. Helps debugging. 1218 * @return StringBuilder filled with references if any. 1219 * @throws IOException 1220 */ 1221 private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context, 1222 final List<byte []> refs) 1223 throws IOException { 1224 StringBuilder refsSb = null; 1225 if (refs.isEmpty()) return refsSb; 1226 refsSb = new StringBuilder(); 1227 String comma = ""; 1228 // If a row is a reference but has no define, print the content of the row that has 1229 // this row as a 'prev'; it will help debug. The missing row was written just before 1230 // the row we are dumping out here. 1231 TableName tn = getTableName(context.getConfiguration()); 1232 try (Table t = this.connection.getTable(tn)) { 1233 for (byte [] ref : refs) { 1234 Result r = t.get(new Get(ref)); 1235 List<Cell> cells = r.listCells(); 1236 String ts = (cells != null && !cells.isEmpty())? 1237 new java.util.Date(cells.get(0).getTimestamp()).toString(): ""; 1238 byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT); 1239 String jobStr = (b != null && b.length > 0)? Bytes.toString(b): ""; 1240 b = r.getValue(FAMILY_NAME, COLUMN_COUNT); 1241 long count = (b != null && b.length > 0)? Bytes.toLong(b): -1; 1242 b = r.getValue(FAMILY_NAME, COLUMN_PREV); 1243 String refRegionLocation = ""; 1244 String keyRegionLocation = ""; 1245 if (b != null && b.length > 0) { 1246 try (RegionLocator rl = this.connection.getRegionLocator(tn)) { 1247 HRegionLocation hrl = rl.getRegionLocation(b); 1248 if (hrl != null) refRegionLocation = hrl.toString(); 1249 // Key here probably has trailing zeros on it. 1250 hrl = rl.getRegionLocation(key.getBytes()); 1251 if (hrl != null) keyRegionLocation = hrl.toString(); 1252 } 1253 } 1254 LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) + 1255 ", refPrevEqualsKey=" + 1256 (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) + 1257 ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) + 1258 ", ref row date=" + ts + ", jobStr=" + jobStr + 1259 ", ref row count=" + count + 1260 ", ref row regionLocation=" + refRegionLocation + 1261 ", key row regionLocation=" + keyRegionLocation); 1262 refsSb.append(comma); 1263 comma = ","; 1264 refsSb.append(Bytes.toStringBinary(ref)); 1265 } 1266 } 1267 return refsSb; 1268 } 1269 } 1270 1271 @Override 1272 public int run(String[] args) throws Exception { 1273 if (args.length != 2) { 1274 System.out.println("Usage : " + Verify.class.getSimpleName() 1275 + " <output dir> <num reducers>"); 1276 return 0; 1277 } 1278 1279 String outputDir = args[0]; 1280 int numReducers = Integer.parseInt(args[1]); 1281 1282 return run(outputDir, numReducers); 1283 } 1284 1285 public int run(String outputDir, int numReducers) throws Exception { 1286 return run(new Path(outputDir), numReducers); 1287 } 1288 1289 public int run(Path outputDir, int numReducers) throws Exception { 1290 LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers); 1291 1292 job = Job.getInstance(getConf()); 1293 1294 job.setJobName("Link Verifier"); 1295 job.setNumReduceTasks(numReducers); 1296 job.setJarByClass(getClass()); 1297 1298 setJobScannerConf(job); 1299 1300 Scan scan = new Scan(); 1301 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1302 scan.setCaching(10000); 1303 scan.setCacheBlocks(false); 1304 if (isMultiUnevenColumnFamilies(getConf())) { 1305 scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME); 1306 scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME); 1307 } 1308 1309 TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan, 1310 VerifyMapper.class, BytesWritable.class, BytesWritable.class, job); 1311 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 1312 AbstractHBaseTool.class); 1313 1314 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 1315 1316 job.setReducerClass(VerifyReducer.class); 1317 job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class); 1318 job.setOutputKeyClass(BytesWritable.class); 1319 job.setOutputValueClass(BytesWritable.class); 1320 TextOutputFormat.setOutputPath(job, outputDir); 1321 1322 boolean success = job.waitForCompletion(true); 1323 1324 if (success) { 1325 Counters counters = job.getCounters(); 1326 if (null == counters) { 1327 LOG.warn("Counters were null, cannot verify Job completion." 1328 + " This is commonly a result of insufficient YARN configuration."); 1329 // We don't have access to the counters to know if we have "bad" counts 1330 return 0; 1331 } 1332 1333 // If we find no unexpected values, the job didn't outright fail 1334 if (verifyUnexpectedValues(counters)) { 1335 // We didn't check referenced+unreferenced counts, leave that to visual inspection 1336 return 0; 1337 } 1338 } 1339 1340 // We failed 1341 return 1; 1342 } 1343 1344 public boolean verify(long expectedReferenced) throws Exception { 1345 if (job == null) { 1346 throw new IllegalStateException("You should call run() first"); 1347 } 1348 1349 Counters counters = job.getCounters(); 1350 if (counters == null) { 1351 LOG.info("Counters object was null, write verification cannot be performed." 1352 + " This is commonly a result of insufficient YARN configuration."); 1353 return false; 1354 } 1355 1356 // Run through each check, even if we fail one early 1357 boolean success = verifyExpectedValues(expectedReferenced, counters); 1358 1359 if (!verifyUnexpectedValues(counters)) { 1360 // We found counter objects which imply failure 1361 success = false; 1362 } 1363 1364 if (!success) { 1365 handleFailure(counters); 1366 } 1367 return success; 1368 } 1369 1370 /** 1371 * Verify the values in the Counters against the expected number of entries written. 1372 * 1373 * @param expectedReferenced 1374 * Expected number of referenced entrires 1375 * @param counters 1376 * The Job's Counters object 1377 * @return True if the values match what's expected, false otherwise 1378 */ 1379 protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) { 1380 final Counter referenced = counters.findCounter(Counts.REFERENCED); 1381 final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED); 1382 boolean success = true; 1383 1384 if (expectedReferenced != referenced.getValue()) { 1385 LOG.error("Expected referenced count does not match with actual referenced count. " + 1386 "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue()); 1387 success = false; 1388 } 1389 1390 if (unreferenced.getValue() > 0) { 1391 final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES); 1392 boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue()); 1393 LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue() 1394 + (couldBeMultiRef ? "; could be due to duplicate random numbers" : "")); 1395 success = false; 1396 } 1397 1398 return success; 1399 } 1400 1401 /** 1402 * Verify that the Counters don't contain values which indicate an outright failure from the Reducers. 1403 * 1404 * @param counters 1405 * The Job's counters 1406 * @return True if the "bad" counter objects are 0, false otherwise 1407 */ 1408 protected boolean verifyUnexpectedValues(Counters counters) { 1409 final Counter undefined = counters.findCounter(Counts.UNDEFINED); 1410 final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES); 1411 boolean success = true; 1412 1413 if (undefined.getValue() > 0) { 1414 LOG.error("Found an undefined node. Undefined count=" + undefined.getValue()); 1415 success = false; 1416 } 1417 1418 if (lostfamilies.getValue() > 0) { 1419 LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue()); 1420 success = false; 1421 } 1422 1423 return success; 1424 } 1425 1426 protected void handleFailure(Counters counters) throws IOException { 1427 Configuration conf = job.getConfiguration(); 1428 TableName tableName = getTableName(conf); 1429 try (Connection conn = ConnectionFactory.createConnection(conf)) { 1430 try (RegionLocator rl = conn.getRegionLocator(tableName)) { 1431 CounterGroup g = counters.getGroup("undef"); 1432 Iterator<Counter> it = g.iterator(); 1433 while (it.hasNext()) { 1434 String keyString = it.next().getName(); 1435 byte[] key = Bytes.toBytes(keyString); 1436 HRegionLocation loc = rl.getRegionLocation(key, true); 1437 LOG.error("undefined row " + keyString + ", " + loc); 1438 } 1439 g = counters.getGroup("unref"); 1440 it = g.iterator(); 1441 while (it.hasNext()) { 1442 String keyString = it.next().getName(); 1443 byte[] key = Bytes.toBytes(keyString); 1444 HRegionLocation loc = rl.getRegionLocation(key, true); 1445 LOG.error("unreferred row " + keyString + ", " + loc); 1446 } 1447 } 1448 } 1449 } 1450 } 1451 1452 /** 1453 * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration 1454 * adds more data. 1455 */ 1456 static class Loop extends Configured implements Tool { 1457 1458 private static final Logger LOG = LoggerFactory.getLogger(Loop.class); 1459 private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " + 1460 "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" + 1461 " <num walker threads>] \n" + 1462 "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" + 1463 "walkers will select and verify random flushed loop during Generation."; 1464 1465 IntegrationTestBigLinkedList it; 1466 1467 protected void runGenerator(int numMappers, long numNodes, 1468 String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers) 1469 throws Exception { 1470 Path outputPath = new Path(outputDir); 1471 UUID uuid = UUID.randomUUID(); //create a random UUID. 1472 Path generatorOutput = new Path(outputPath, uuid.toString()); 1473 1474 Generator generator = new Generator(); 1475 generator.setConf(getConf()); 1476 int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 1477 numWalkers); 1478 if (retCode > 0) { 1479 throw new RuntimeException("Generator failed with return code: " + retCode); 1480 } 1481 if (numWalkers > 0) { 1482 if (!generator.verify()) { 1483 throw new RuntimeException("Generator.verify failed"); 1484 } 1485 } 1486 } 1487 1488 protected void runVerify(String outputDir, 1489 int numReducers, long expectedNumNodes) throws Exception { 1490 Path outputPath = new Path(outputDir); 1491 UUID uuid = UUID.randomUUID(); //create a random UUID. 1492 Path iterationOutput = new Path(outputPath, uuid.toString()); 1493 1494 Verify verify = new Verify(); 1495 verify.setConf(getConf()); 1496 int retCode = verify.run(iterationOutput, numReducers); 1497 if (retCode > 0) { 1498 throw new RuntimeException("Verify.run failed with return code: " + retCode); 1499 } 1500 1501 if (!verify.verify(expectedNumNodes)) { 1502 throw new RuntimeException("Verify.verify failed"); 1503 } 1504 LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes); 1505 } 1506 1507 @Override 1508 public int run(String[] args) throws Exception { 1509 if (args.length < 5) { 1510 System.err.println(USAGE); 1511 return 1; 1512 } 1513 try { 1514 int numIterations = Integer.parseInt(args[0]); 1515 int numMappers = Integer.parseInt(args[1]); 1516 long numNodes = Long.parseLong(args[2]); 1517 String outputDir = args[3]; 1518 int numReducers = Integer.parseInt(args[4]); 1519 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); 1520 Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); 1521 Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]); 1522 1523 long expectedNumNodes = 0; 1524 1525 if (numIterations < 0) { 1526 numIterations = Integer.MAX_VALUE; //run indefinitely (kind of) 1527 } 1528 LOG.info("Running Loop with args:" + Arrays.deepToString(args)); 1529 for (int i = 0; i < numIterations; i++) { 1530 LOG.info("Starting iteration = " + i); 1531 runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers); 1532 expectedNumNodes += numMappers * numNodes; 1533 runVerify(outputDir, numReducers, expectedNumNodes); 1534 } 1535 return 0; 1536 } catch (NumberFormatException e) { 1537 System.err.println("Parsing loop arguments failed: " + e.getMessage()); 1538 System.err.println(USAGE); 1539 return 1; 1540 } 1541 } 1542 } 1543 1544 /** 1545 * A stand alone program that prints out portions of a list created by {@link Generator} 1546 */ 1547 private static class Print extends Configured implements Tool { 1548 @Override 1549 public int run(String[] args) throws Exception { 1550 Options options = new Options(); 1551 options.addOption("s", "start", true, "start key"); 1552 options.addOption("e", "end", true, "end key"); 1553 options.addOption("l", "limit", true, "number to print"); 1554 1555 GnuParser parser = new GnuParser(); 1556 CommandLine cmd = null; 1557 try { 1558 cmd = parser.parse(options, args); 1559 if (cmd.getArgs().length != 0) { 1560 throw new ParseException("Command takes no arguments"); 1561 } 1562 } catch (ParseException e) { 1563 System.err.println("Failed to parse command line " + e.getMessage()); 1564 System.err.println(); 1565 HelpFormatter formatter = new HelpFormatter(); 1566 formatter.printHelp(getClass().getSimpleName(), options); 1567 System.exit(-1); 1568 } 1569 1570 Connection connection = ConnectionFactory.createConnection(getConf()); 1571 Table table = connection.getTable(getTableName(getConf())); 1572 1573 Scan scan = new Scan(); 1574 scan.setBatch(10000); 1575 1576 if (cmd.hasOption("s")) 1577 scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s"))); 1578 1579 if (cmd.hasOption("e")) 1580 scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e"))); 1581 1582 int limit = 0; 1583 if (cmd.hasOption("l")) 1584 limit = Integer.parseInt(cmd.getOptionValue("l")); 1585 else 1586 limit = 100; 1587 1588 ResultScanner scanner = table.getScanner(scan); 1589 1590 CINode node = new CINode(); 1591 Result result = scanner.next(); 1592 int count = 0; 1593 while (result != null && count++ < limit) { 1594 node = getCINode(result, node); 1595 System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key), 1596 Bytes.toStringBinary(node.prev), node.count, node.client); 1597 result = scanner.next(); 1598 } 1599 scanner.close(); 1600 table.close(); 1601 connection.close(); 1602 1603 return 0; 1604 } 1605 } 1606 1607 /** 1608 * A stand alone program that deletes a single node. 1609 */ 1610 private static class Delete extends Configured implements Tool { 1611 @Override 1612 public int run(String[] args) throws Exception { 1613 if (args.length != 1) { 1614 System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>"); 1615 return 0; 1616 } 1617 byte[] val = Bytes.toBytesBinary(args[0]); 1618 1619 org.apache.hadoop.hbase.client.Delete delete 1620 = new org.apache.hadoop.hbase.client.Delete(val); 1621 1622 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1623 Table table = connection.getTable(getTableName(getConf()))) { 1624 table.delete(delete); 1625 } 1626 1627 System.out.println("Delete successful"); 1628 return 0; 1629 } 1630 } 1631 1632 abstract static class WalkerBase extends Configured{ 1633 protected static CINode findStartNode(Table table, byte[] startKey) throws IOException { 1634 Scan scan = new Scan(); 1635 scan.setStartRow(startKey); 1636 scan.setBatch(1); 1637 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1638 1639 long t1 = System.currentTimeMillis(); 1640 ResultScanner scanner = table.getScanner(scan); 1641 Result result = scanner.next(); 1642 long t2 = System.currentTimeMillis(); 1643 scanner.close(); 1644 1645 if ( result != null) { 1646 CINode node = getCINode(result, new CINode()); 1647 System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key)); 1648 return node; 1649 } 1650 1651 System.out.println("FSR " + (t2 - t1)); 1652 1653 return null; 1654 } 1655 protected CINode getNode(byte[] row, Table table, CINode node) throws IOException { 1656 Get get = new Get(row); 1657 get.addColumn(FAMILY_NAME, COLUMN_PREV); 1658 Result result = table.get(get); 1659 return getCINode(result, node); 1660 } 1661 } 1662 /** 1663 * A stand alone program that follows a linked list created by {@link Generator} and prints 1664 * timing info. 1665 */ 1666 private static class Walker extends WalkerBase implements Tool { 1667 1668 public Walker(){} 1669 1670 @Override 1671 public int run(String[] args) throws IOException { 1672 1673 Options options = new Options(); 1674 options.addOption("n", "num", true, "number of queries"); 1675 options.addOption("s", "start", true, "key to start at, binary string"); 1676 options.addOption("l", "logevery", true, "log every N queries"); 1677 1678 GnuParser parser = new GnuParser(); 1679 CommandLine cmd = null; 1680 try { 1681 cmd = parser.parse(options, args); 1682 if (cmd.getArgs().length != 0) { 1683 throw new ParseException("Command takes no arguments"); 1684 } 1685 } catch (ParseException e) { 1686 System.err.println("Failed to parse command line " + e.getMessage()); 1687 System.err.println(); 1688 HelpFormatter formatter = new HelpFormatter(); 1689 formatter.printHelp(getClass().getSimpleName(), options); 1690 System.exit(-1); 1691 } 1692 1693 long maxQueries = Long.MAX_VALUE; 1694 if (cmd.hasOption('n')) { 1695 maxQueries = Long.parseLong(cmd.getOptionValue("n")); 1696 } 1697 Random rand = new SecureRandom(); 1698 boolean isSpecificStart = cmd.hasOption('s'); 1699 1700 byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null; 1701 int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1; 1702 1703 Connection connection = ConnectionFactory.createConnection(getConf()); 1704 Table table = connection.getTable(getTableName(getConf())); 1705 long numQueries = 0; 1706 // If isSpecificStart is set, only walk one list from that particular node. 1707 // Note that in case of circular (or P-shaped) list it will walk forever, as is 1708 // the case in normal run without startKey. 1709 while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) { 1710 if (!isSpecificStart) { 1711 startKey = new byte[ROWKEY_LENGTH]; 1712 rand.nextBytes(startKey); 1713 } 1714 CINode node = findStartNode(table, startKey); 1715 if (node == null && isSpecificStart) { 1716 System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey)); 1717 } 1718 numQueries++; 1719 while (node != null && node.prev.length != NO_KEY.length && 1720 numQueries < maxQueries) { 1721 byte[] prev = node.prev; 1722 long t1 = System.currentTimeMillis(); 1723 node = getNode(prev, table, node); 1724 long t2 = System.currentTimeMillis(); 1725 if (logEvery > 0 && numQueries % logEvery == 0) { 1726 System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev)); 1727 } 1728 numQueries++; 1729 if (node == null) { 1730 System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev)); 1731 } else if (node.prev.length == NO_KEY.length) { 1732 System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key)); 1733 } 1734 } 1735 } 1736 table.close(); 1737 connection.close(); 1738 return 0; 1739 } 1740 } 1741 1742 private static class Clean extends Configured implements Tool { 1743 @Override public int run(String[] args) throws Exception { 1744 if (args.length < 1) { 1745 System.err.println("Usage: Clean <output dir>"); 1746 return -1; 1747 } 1748 1749 Path p = new Path(args[0]); 1750 Configuration conf = getConf(); 1751 TableName tableName = getTableName(conf); 1752 try (FileSystem fs = HFileSystem.get(conf); 1753 Connection conn = ConnectionFactory.createConnection(conf); 1754 Admin admin = conn.getAdmin()) { 1755 if (admin.tableExists(tableName)) { 1756 admin.disableTable(tableName); 1757 admin.deleteTable(tableName); 1758 } 1759 1760 if (fs.exists(p)) { 1761 fs.delete(p, true); 1762 } 1763 } 1764 1765 return 0; 1766 } 1767 } 1768 1769 static TableName getTableName(Configuration conf) { 1770 return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1771 } 1772 1773 private static CINode getCINode(Result result, CINode node) { 1774 node.key = Bytes.copy(result.getRow()); 1775 if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) { 1776 node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV)); 1777 } else { 1778 node.prev = NO_KEY; 1779 } 1780 if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) { 1781 node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT)); 1782 } else { 1783 node.count = -1; 1784 } 1785 if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) { 1786 node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT)); 1787 } else { 1788 node.client = ""; 1789 } 1790 return node; 1791 } 1792 1793 protected IntegrationTestingUtility util; 1794 1795 @Override 1796 public void setUpCluster() throws Exception { 1797 util = getTestingUtil(getConf()); 1798 boolean isDistributed = util.isDistributedCluster(); 1799 util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE); 1800 if (!isDistributed) { 1801 util.startMiniMapReduceCluster(); 1802 } 1803 this.setConf(util.getConfiguration()); 1804 } 1805 1806 @Override 1807 public void cleanUpCluster() throws Exception { 1808 super.cleanUpCluster(); 1809 if (util.isDistributedCluster()) { 1810 util.shutdownMiniMapReduceCluster(); 1811 } 1812 } 1813 1814 private static boolean isMultiUnevenColumnFamilies(Configuration conf) { 1815 return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,true); 1816 } 1817 1818 @Test 1819 public void testContinuousIngest() throws IOException, Exception { 1820 //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> 1821 Configuration conf = getTestingUtil(getConf()).getConfiguration(); 1822 if (isMultiUnevenColumnFamilies(getConf())) { 1823 // make sure per CF flush is on 1824 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 1825 } 1826 int ret = 1827 ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000", 1828 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" }); 1829 org.junit.Assert.assertEquals(0, ret); 1830 } 1831 1832 private void usage() { 1833 System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]"); 1834 printCommands(); 1835 } 1836 1837 private void printCommands() { 1838 System.err.println("Commands:"); 1839 System.err.println(" generator Map only job that generates data."); 1840 System.err.println(" verify A map reduce job that looks for holes. Check return code and"); 1841 System.err.println(" look at the counts after running. See REFERENCED and"); 1842 System.err.println(" UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run"); 1843 System.err.println(" with the Generator."); 1844 System.err.println(" walker " + 1845 "Standalone program that starts following a linked list & emits timing info."); 1846 System.err.println(" print Standalone program that prints nodes in the linked list."); 1847 System.err.println(" delete Standalone program that deletes a·single node."); 1848 System.err.println(" loop Program to Loop through Generator and Verify steps"); 1849 System.err.println(" clean Program to clean all left over detritus."); 1850 System.err.println(" search Search for missing keys."); 1851 System.err.println(""); 1852 System.err.println("General options:"); 1853 System.err.println(" -D"+ TABLE_NAME_KEY+ "=<tableName>"); 1854 System.err.println(" Run using the <tableName> as the tablename. Defaults to " 1855 + DEFAULT_TABLE_NAME); 1856 System.err.println(" -D"+ HBaseTestingUtility.REGIONS_PER_SERVER_KEY+ "=<# regions>"); 1857 System.err.println(" Create table with presplit regions per server. Defaults to " 1858 + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER); 1859 1860 System.err.println(" -DuseMob=<true|false>"); 1861 System.err.println(" Create table so that the mob read/write path is forced. " + 1862 "Defaults to false"); 1863 1864 System.err.flush(); 1865 } 1866 1867 @Override 1868 protected void processOptions(CommandLine cmd) { 1869 super.processOptions(cmd); 1870 String[] args = cmd.getArgs(); 1871 //get the class, run with the conf 1872 if (args.length < 1) { 1873 printUsage(this.getClass().getSimpleName() + 1874 " <general options> COMMAND [<COMMAND options>]", "General options:", ""); 1875 printCommands(); 1876 // Have to throw an exception here to stop the processing. Looks ugly but gets message across. 1877 throw new RuntimeException("Incorrect Number of args."); 1878 } 1879 toRun = args[0]; 1880 otherArgs = Arrays.copyOfRange(args, 1, args.length); 1881 } 1882 1883 @Override 1884 public int runTestFromCommandLine() throws Exception { 1885 Tool tool = null; 1886 if (toRun.equalsIgnoreCase("Generator")) { 1887 tool = new Generator(); 1888 } else if (toRun.equalsIgnoreCase("Verify")) { 1889 tool = new Verify(); 1890 } else if (toRun.equalsIgnoreCase("Loop")) { 1891 Loop loop = new Loop(); 1892 loop.it = this; 1893 tool = loop; 1894 } else if (toRun.equalsIgnoreCase("Walker")) { 1895 tool = new Walker(); 1896 } else if (toRun.equalsIgnoreCase("Print")) { 1897 tool = new Print(); 1898 } else if (toRun.equalsIgnoreCase("Delete")) { 1899 tool = new Delete(); 1900 } else if (toRun.equalsIgnoreCase("Clean")) { 1901 tool = new Clean(); 1902 } else if (toRun.equalsIgnoreCase("Search")) { 1903 tool = new Search(); 1904 } else { 1905 usage(); 1906 throw new RuntimeException("Unknown arg"); 1907 } 1908 1909 return ToolRunner.run(getConf(), tool, otherArgs); 1910 } 1911 1912 @Override 1913 public TableName getTablename() { 1914 Configuration c = getConf(); 1915 return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1916 } 1917 1918 @Override 1919 protected Set<String> getColumnFamilies() { 1920 if (isMultiUnevenColumnFamilies(getConf())) { 1921 return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME), 1922 Bytes.toString(TINY_FAMILY_NAME)); 1923 } else { 1924 return Sets.newHashSet(Bytes.toString(FAMILY_NAME)); 1925 } 1926 } 1927 1928 private static void setJobConf(Job job, int numMappers, long numNodes, 1929 Integer width, Integer wrapMultiplier, Integer numWalkers) { 1930 job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); 1931 job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes); 1932 if (width != null) { 1933 job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width); 1934 } 1935 if (wrapMultiplier != null) { 1936 job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier); 1937 } 1938 if (numWalkers != null) { 1939 job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers); 1940 } 1941 } 1942 1943 public static void setJobScannerConf(Job job) { 1944 // Make sure scanners log something useful to make debugging possible. 1945 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true); 1946 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000); 1947 } 1948 1949 public static void main(String[] args) throws Exception { 1950 Configuration conf = HBaseConfiguration.create(); 1951 IntegrationTestingUtility.setUseDistributedCluster(conf); 1952 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args); 1953 System.exit(ret); 1954 } 1955}