001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.security.SecureRandom; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Random; 031import java.util.Set; 032import java.util.SortedSet; 033import java.util.TreeSet; 034import java.util.UUID; 035import java.util.concurrent.ThreadLocalRandom; 036import java.util.concurrent.atomic.AtomicInteger; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.conf.Configured; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.LocatedFileStatus; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.fs.RemoteIterator; 043import org.apache.hadoop.hbase.Cell; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HBaseTestingUtility; 046import org.apache.hadoop.hbase.HColumnDescriptor; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.HRegionLocation; 049import org.apache.hadoop.hbase.HTableDescriptor; 050import org.apache.hadoop.hbase.IntegrationTestBase; 051import org.apache.hadoop.hbase.IntegrationTestingUtility; 052import org.apache.hadoop.hbase.MasterNotRunningException; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.client.Admin; 055import org.apache.hadoop.hbase.client.BufferedMutator; 056import org.apache.hadoop.hbase.client.BufferedMutatorParams; 057import org.apache.hadoop.hbase.client.Connection; 058import org.apache.hadoop.hbase.client.ConnectionConfiguration; 059import org.apache.hadoop.hbase.client.ConnectionFactory; 060import org.apache.hadoop.hbase.client.Get; 061import org.apache.hadoop.hbase.client.Mutation; 062import org.apache.hadoop.hbase.client.Put; 063import org.apache.hadoop.hbase.client.RegionLocator; 064import org.apache.hadoop.hbase.client.Result; 065import org.apache.hadoop.hbase.client.ResultScanner; 066import org.apache.hadoop.hbase.client.Scan; 067import org.apache.hadoop.hbase.client.ScannerCallable; 068import org.apache.hadoop.hbase.client.Table; 069import org.apache.hadoop.hbase.fs.HFileSystem; 070import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 071import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 072import org.apache.hadoop.hbase.mapreduce.TableMapper; 073import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; 074import org.apache.hadoop.hbase.mapreduce.WALPlayer; 075import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy; 076import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; 077import org.apache.hadoop.hbase.testclassification.IntegrationTests; 078import org.apache.hadoop.hbase.util.AbstractHBaseTool; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.apache.hadoop.hbase.util.CommonFSUtils; 081import org.apache.hadoop.hbase.util.Random64; 082import org.apache.hadoop.hbase.util.RegionSplitter; 083import org.apache.hadoop.hbase.wal.WALEdit; 084import org.apache.hadoop.hbase.wal.WALKey; 085import org.apache.hadoop.io.BytesWritable; 086import org.apache.hadoop.io.NullWritable; 087import org.apache.hadoop.io.Writable; 088import org.apache.hadoop.mapreduce.Counter; 089import org.apache.hadoop.mapreduce.CounterGroup; 090import org.apache.hadoop.mapreduce.Counters; 091import org.apache.hadoop.mapreduce.InputFormat; 092import org.apache.hadoop.mapreduce.InputSplit; 093import org.apache.hadoop.mapreduce.Job; 094import org.apache.hadoop.mapreduce.JobContext; 095import org.apache.hadoop.mapreduce.Mapper; 096import org.apache.hadoop.mapreduce.RecordReader; 097import org.apache.hadoop.mapreduce.Reducer; 098import org.apache.hadoop.mapreduce.TaskAttemptContext; 099import org.apache.hadoop.mapreduce.TaskAttemptID; 100import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 101import org.apache.hadoop.mapreduce.lib.input.FileSplit; 102import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat; 103import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 104import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 105import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 106import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; 107import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 108import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 109import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; 110import org.apache.hadoop.util.Tool; 111import org.apache.hadoop.util.ToolRunner; 112import org.junit.Test; 113import org.junit.experimental.categories.Category; 114import org.slf4j.Logger; 115import org.slf4j.LoggerFactory; 116 117import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 118import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 119import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 120import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 121import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 122import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 123import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 124 125/** 126 * <p> 127 * This is an integration test borrowed from goraci, written by Keith Turner, 128 * which is in turn inspired by the Accumulo test called continous ingest (ci). 129 * The original source code can be found here: 130 * <ul> 131 * <li>https://github.com/keith-turner/goraci</li> 132 * <li>https://github.com/enis/goraci/</li> 133 * </ul> 134 * </p> 135 * <p> 136 * Apache Accumulo [0] has a simple test suite that verifies that data is not 137 * lost at scale. This test suite is called continuous ingest. This test runs 138 * many ingest clients that continually create linked lists containing 25 139 * million nodes. At some point the clients are stopped and a map reduce job is 140 * run to ensure no linked list has a hole. A hole indicates data was lost. 141 * </p> 142 * <p> 143 * The nodes in the linked list are random. This causes each linked list to 144 * spread across the table. Therefore if one part of a table loses data, then it 145 * will be detected by references in another part of the table. 146 * </p> 147 * <p> 148 * <h3>THE ANATOMY OF THE TEST</h3> 149 * 150 * Below is rough sketch of how data is written. For specific details look at 151 * the Generator code. 152 * </p> 153 * <p> 154 * <ol> 155 * <li>Write out 1 million nodes</li> 156 * <li>Flush the client</li> 157 * <li>Write out 1 million that reference previous million</li> 158 * <li>If this is the 25th set of 1 million nodes, then update 1st set of 159 * million to point to last</li> 160 * <li>goto 1</li> 161 * </ol> 162 * </p> 163 * <p> 164 * The key is that nodes only reference flushed nodes. Therefore a node should 165 * never reference a missing node, even if the ingest client is killed at any 166 * point in time. 167 * </p> 168 * <p> 169 * When running this test suite w/ Accumulo there is a script running in 170 * parallel called the Aggitator that randomly and continuously kills server 171 * processes. The outcome was that many data loss bugs were found in Accumulo 172 * by doing this. This test suite can also help find bugs that impact uptime 173 * and stability when run for days or weeks. 174 * </p> 175 * <p> 176 * This test suite consists the following 177 * <ul> 178 * <li>a few Java programs</li> 179 * <li>a little helper script to run the java programs</li> 180 * <li>a maven script to build it</li> 181 * </ul> 182 * </p> 183 * <p> 184 * When generating data, its best to have each map task generate a multiple of 185 * 25 million. The reason for this is that circular linked list are generated 186 * every 25M. Not generating a multiple in 25M will result in some nodes in the 187 * linked list not having references. The loss of an unreferenced node can not 188 * be detected. 189 * </p> 190 * <p> 191 * <h3>Below is a description of the Java programs</h3> 192 * <ul> 193 * <li> 194 * {@code Generator} - A map only job that generates data. As stated previously, its best to 195 * generate data in multiples of 25M. An option is also available to allow concurrent walkers to 196 * select and walk random flushed loops during this phase. 197 * </li> 198 * <li> 199 * {@code Verify} - A map reduce job that looks for holes. Look at the counts after running. 200 * {@code REFERENCED} and {@code UNREFERENCED} are ok, any {@code UNDEFINED} counts are bad. Do not 201 * run at the same time as the Generator. 202 * </li> 203 * <li> 204 * {@code Walker} - A standalone program that start following a linked list and emits timing info. 205 * </li> 206 * <li> 207 * {@code Print} - A standalone program that prints nodes in the linked list 208 * </li> 209 * <li> 210 * {@code Delete} - A standalone program that deletes a single node 211 * </li> 212 * </ul> 213 * 214 * This class can be run as a unit test, as an integration test, or from the command line 215 * </p> 216 * <p> 217 * ex: 218 * <pre> 219 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 220 * loop 2 1 100000 /temp 1 1000 50 1 0 221 * </pre> 222 * </p> 223 */ 224@Category(IntegrationTests.class) 225public class IntegrationTestBigLinkedList extends IntegrationTestBase { 226 protected static final byte[] NO_KEY = new byte[1]; 227 228 protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table"; 229 230 protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList"; 231 232 protected static byte[] FAMILY_NAME = Bytes.toBytes("meta"); 233 private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big"); 234 private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny"); 235 236 //link to the id of the prev node in the linked list 237 protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev"); 238 239 //identifier of the mapred task that generated this row 240 protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client"); 241 242 //the id of the row within the same client. 243 protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count"); 244 245 /** How many rows to write per map task. This has to be a multiple of 25M */ 246 private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY 247 = "IntegrationTestBigLinkedList.generator.num_rows"; 248 249 private static final String GENERATOR_NUM_MAPPERS_KEY 250 = "IntegrationTestBigLinkedList.generator.map.tasks"; 251 252 private static final String GENERATOR_WIDTH_KEY 253 = "IntegrationTestBigLinkedList.generator.width"; 254 255 private static final String GENERATOR_WRAP_KEY 256 = "IntegrationTestBigLinkedList.generator.wrap"; 257 258 private static final String CONCURRENT_WALKER_KEY 259 = "IntegrationTestBigLinkedList.generator.concurrentwalkers"; 260 261 protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster 262 263 private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters 264 265 private static final int WIDTH_DEFAULT = 1000000; 266 private static final int WRAP_DEFAULT = 25; 267 private static final int ROWKEY_LENGTH = 16; 268 269 private static final int CONCURRENT_WALKER_DEFAULT = 0; 270 271 protected String toRun; 272 protected String[] otherArgs; 273 274 static class CINode { 275 byte[] key; 276 byte[] prev; 277 String client; 278 long count; 279 } 280 281 /** 282 * A Map only job that generates random linked list and stores them. 283 */ 284 static class Generator extends Configured implements Tool { 285 286 private static final Logger LOG = LoggerFactory.getLogger(Generator.class); 287 288 /** 289 * Set this configuration if you want to test single-column family flush works. If set, we will 290 * add a big column family and a small column family on either side of the usual ITBLL 'meta' 291 * column family. When we write out the ITBLL, we will also add to the big column family a value 292 * bigger than that for ITBLL and for small, something way smaller. The idea is that when 293 * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any 294 * way. Here is how you would pass it: 295 * <p> 296 * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 297 * -Dgenerator.multiple.columnfamilies=true generator 1 10 g 298 */ 299 public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY = 300 "generator.multiple.columnfamilies"; 301 302 /** 303 * Set this configuration if you want to scale up the size of test data quickly. 304 * <p> 305 * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 306 * -Dgenerator.big.family.value.size=1024 generator 1 10 output 307 */ 308 public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size"; 309 310 311 public static enum Counts { 312 SUCCESS, TERMINATING, UNDEFINED, IOEXCEPTION 313 } 314 315 public static final String USAGE = "Usage : " + Generator.class.getSimpleName() + 316 " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" + 317 " <num walker threads>] \n" + 318 "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" + 319 "walkers will verify random flushed loop during Generation."; 320 321 public Job job; 322 323 static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> { 324 static class GeneratorInputSplit extends InputSplit implements Writable { 325 @Override 326 public long getLength() throws IOException, InterruptedException { 327 return 1; 328 } 329 @Override 330 public String[] getLocations() throws IOException, InterruptedException { 331 return new String[0]; 332 } 333 @Override 334 public void readFields(DataInput arg0) throws IOException { 335 } 336 @Override 337 public void write(DataOutput arg0) throws IOException { 338 } 339 } 340 341 static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> { 342 private long count; 343 private long numNodes; 344 private Random64 rand; 345 346 @Override 347 public void close() throws IOException { 348 } 349 350 @Override 351 public BytesWritable getCurrentKey() throws IOException, InterruptedException { 352 byte[] bytes = new byte[ROWKEY_LENGTH]; 353 rand.nextBytes(bytes); 354 return new BytesWritable(bytes); 355 } 356 357 @Override 358 public NullWritable getCurrentValue() throws IOException, InterruptedException { 359 return NullWritable.get(); 360 } 361 362 @Override 363 public float getProgress() throws IOException, InterruptedException { 364 return (float)(count / (double)numNodes); 365 } 366 367 @Override 368 public void initialize(InputSplit arg0, TaskAttemptContext context) 369 throws IOException, InterruptedException { 370 numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000); 371 // Use Random64 to avoid issue described in HBASE-21256. 372 rand = new Random64(); 373 } 374 375 @Override 376 public boolean nextKeyValue() throws IOException, InterruptedException { 377 return count++ < numNodes; 378 } 379 380 } 381 382 @Override 383 public RecordReader<BytesWritable,NullWritable> createRecordReader( 384 InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 385 GeneratorRecordReader rr = new GeneratorRecordReader(); 386 rr.initialize(split, context); 387 return rr; 388 } 389 390 @Override 391 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 392 int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1); 393 394 ArrayList<InputSplit> splits = new ArrayList<>(numMappers); 395 396 for (int i = 0; i < numMappers; i++) { 397 splits.add(new GeneratorInputSplit()); 398 } 399 400 return splits; 401 } 402 } 403 404 /** Ensure output files from prev-job go to map inputs for current job */ 405 static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> { 406 @Override 407 protected boolean isSplitable(JobContext context, Path filename) { 408 return false; 409 } 410 } 411 412 /** 413 * Some ASCII art time: 414 * <p> 415 * [ . . . ] represents one batch of random longs of length WIDTH 416 * <pre> 417 * _________________________ 418 * | ______ | 419 * | | || 420 * .-+-----------------+-----.|| 421 * | | | ||| 422 * first = [ . . . . . . . . . . . ] ||| 423 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 424 * | | | | | | | | | | | ||| 425 * prev = [ . . . . . . . . . . . ] ||| 426 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 427 * | | | | | | | | | | | ||| 428 * current = [ . . . . . . . . . . . ] ||| 429 * ||| 430 * ... ||| 431 * ||| 432 * last = [ . . . . . . . . . . . ] ||| 433 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____||| 434 * | |________|| 435 * |___________________________| 436 * </pre> 437 */ 438 439 static class GeneratorMapper 440 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 441 442 byte[][] first = null; 443 byte[][] prev = null; 444 byte[][] current = null; 445 byte[] id; 446 long count = 0; 447 int i; 448 BufferedMutator mutator; 449 Connection connection; 450 long numNodes; 451 long wrap; 452 int width; 453 boolean multipleUnevenColumnFamilies; 454 byte[] tinyValue = new byte[] { 't' }; 455 byte[] bigValue = null; 456 Configuration conf; 457 458 volatile boolean walkersStop; 459 int numWalkers; 460 volatile List<Long> flushedLoops = new ArrayList<>(); 461 List<Thread> walkers = new ArrayList<>(); 462 463 @Override 464 protected void setup(Context context) throws IOException, InterruptedException { 465 id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID()); 466 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 467 instantiateHTable(); 468 this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT); 469 current = new byte[this.width][]; 470 int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT); 471 this.wrap = (long)wrapMultiplier * width; 472 this.numNodes = context.getConfiguration().getLong( 473 GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT); 474 if (this.numNodes < this.wrap) { 475 this.wrap = this.numNodes; 476 } 477 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 478 this.numWalkers = context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT); 479 this.walkersStop = false; 480 this.conf = context.getConfiguration(); 481 482 if (multipleUnevenColumnFamilies) { 483 int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256); 484 int limit = context.getConfiguration().getInt( 485 ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 486 ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT); 487 488 Preconditions.checkArgument( 489 n <= limit, 490 "%s(%s) > %s(%s)", 491 BIG_FAMILY_VALUE_SIZE_KEY, n, ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit); 492 493 bigValue = new byte[n]; 494 ThreadLocalRandom.current().nextBytes(bigValue); 495 LOG.info("Create a bigValue with " + n + " bytes."); 496 } 497 498 Preconditions.checkArgument( 499 numNodes > 0, 500 "numNodes(%s) <= 0", 501 numNodes); 502 Preconditions.checkArgument( 503 numNodes % width == 0, 504 "numNodes(%s) mod width(%s) != 0", 505 numNodes, width); 506 Preconditions.checkArgument( 507 numNodes % wrap == 0, 508 "numNodes(%s) mod wrap(%s) != 0", 509 numNodes, wrap 510 ); 511 } 512 513 protected void instantiateHTable() throws IOException { 514 mutator = connection.getBufferedMutator( 515 new BufferedMutatorParams(getTableName(connection.getConfiguration())) 516 .writeBufferSize(4 * 1024 * 1024)); 517 } 518 519 @Override 520 protected void cleanup(Context context) throws IOException ,InterruptedException { 521 joinWalkers(); 522 mutator.close(); 523 connection.close(); 524 } 525 526 @Override 527 protected void map(BytesWritable key, NullWritable value, Context output) throws IOException { 528 current[i] = new byte[key.getLength()]; 529 System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength()); 530 if (++i == current.length) { 531 LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=", 532 current.length, count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i); 533 persist(output, count, prev, current, id); 534 i = 0; 535 536 if (first == null) { 537 first = current; 538 } 539 prev = current; 540 current = new byte[this.width][]; 541 542 count += current.length; 543 output.setStatus("Count " + count); 544 545 if (count % wrap == 0) { 546 // this block of code turns the 1 million linked list of length 25 into one giant 547 //circular linked list of 25 million 548 circularLeftShift(first); 549 persist(output, -1, prev, first, null); 550 // At this point the entire loop has been flushed so we can add one of its nodes to the 551 // concurrent walker 552 if (numWalkers > 0) { 553 addFlushed(key.getBytes()); 554 if (walkers.isEmpty()) { 555 startWalkers(numWalkers, conf, output); 556 } 557 } 558 first = null; 559 prev = null; 560 } 561 } 562 } 563 564 private static <T> void circularLeftShift(T[] first) { 565 T ez = first[0]; 566 System.arraycopy(first, 1, first, 0, first.length - 1); 567 first[first.length - 1] = ez; 568 } 569 570 private void addFlushed(byte[] rowKey) { 571 synchronized (flushedLoops) { 572 flushedLoops.add(Bytes.toLong(rowKey)); 573 flushedLoops.notifyAll(); 574 } 575 } 576 577 protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id) 578 throws IOException { 579 for (int i = 0; i < current.length; i++) { 580 581 if (i % 100 == 0) { 582 // Tickle progress every so often else maprunner will think us hung 583 output.progress(); 584 } 585 586 Put put = new Put(current[i]); 587 put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]); 588 589 if (count >= 0) { 590 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); 591 } 592 if (id != null) { 593 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); 594 } 595 // See if we are to write multiple columns. 596 if (this.multipleUnevenColumnFamilies) { 597 // Use any column name. 598 put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue); 599 // Use any column name. 600 put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue); 601 } 602 mutator.mutate(put); 603 } 604 605 mutator.flush(); 606 } 607 608 private void startWalkers(int numWalkers, Configuration conf, Context context) { 609 LOG.info("Starting " + numWalkers + " concurrent walkers"); 610 for (int i = 0; i < numWalkers; i++) { 611 Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context)); 612 walker.start(); 613 walkers.add(walker); 614 } 615 } 616 617 private void joinWalkers() { 618 walkersStop = true; 619 synchronized (flushedLoops) { 620 flushedLoops.notifyAll(); 621 } 622 for (Thread walker : walkers) { 623 try { 624 walker.join(); 625 } catch (InterruptedException e) { 626 // no-op 627 } 628 } 629 } 630 631 /** 632 * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by 633 * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are 634 * configured to only log erroneous nodes. 635 */ 636 637 public class ContinuousConcurrentWalker implements Runnable { 638 639 ConcurrentWalker walker; 640 Configuration conf; 641 Context context; 642 Random rand; 643 644 public ContinuousConcurrentWalker(Configuration conf, Context context) { 645 this.conf = conf; 646 this.context = context; 647 rand = new Random(); 648 } 649 650 @Override 651 public void run() { 652 while (!walkersStop) { 653 try { 654 long node = selectLoop(); 655 try { 656 walkLoop(node); 657 } catch (IOException e) { 658 context.getCounter(Counts.IOEXCEPTION).increment(1l); 659 return; 660 } 661 } catch (InterruptedException e) { 662 return; 663 } 664 } 665 } 666 667 private void walkLoop(long node) throws IOException { 668 walker = new ConcurrentWalker(context); 669 walker.setConf(conf); 670 walker.run(node, wrap); 671 } 672 673 private long selectLoop () throws InterruptedException{ 674 synchronized (flushedLoops) { 675 while (flushedLoops.isEmpty() && !walkersStop) { 676 flushedLoops.wait(); 677 } 678 if (walkersStop) { 679 throw new InterruptedException(); 680 } 681 return flushedLoops.get(rand.nextInt(flushedLoops.size())); 682 } 683 } 684 } 685 686 public static class ConcurrentWalker extends WalkerBase { 687 688 Context context; 689 690 public ConcurrentWalker(Context context) {this.context = context;} 691 692 public void run(long startKeyIn, long maxQueriesIn) throws IOException { 693 694 long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE; 695 byte[] startKey = Bytes.toBytes(startKeyIn); 696 697 Connection connection = ConnectionFactory.createConnection(getConf()); 698 Table table = connection.getTable(getTableName(getConf())); 699 long numQueries = 0; 700 // If isSpecificStart is set, only walk one list from that particular node. 701 // Note that in case of circular (or P-shaped) list it will walk forever, as is 702 // the case in normal run without startKey. 703 704 CINode node = findStartNode(table, startKey); 705 if (node == null) { 706 LOG.error("Start node not found: " + Bytes.toStringBinary(startKey)); 707 throw new IOException("Start node not found: " + startKeyIn); 708 } 709 while (numQueries < maxQueries) { 710 numQueries++; 711 byte[] prev = node.prev; 712 long t1 = System.currentTimeMillis(); 713 node = getNode(prev, table, node); 714 long t2 = System.currentTimeMillis(); 715 if (node == null) { 716 LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev)); 717 context.getCounter(Counts.UNDEFINED).increment(1l); 718 } else if (node.prev.length == NO_KEY.length) { 719 LOG.error("ConcurrentWalker found TERMINATING NODE: " + 720 Bytes.toStringBinary(node.key)); 721 context.getCounter(Counts.TERMINATING).increment(1l); 722 } else { 723 // Increment for successful walk 724 context.getCounter(Counts.SUCCESS).increment(1l); 725 } 726 } 727 table.close(); 728 connection.close(); 729 } 730 } 731 } 732 733 @Override 734 public int run(String[] args) throws Exception { 735 if (args.length < 3) { 736 System.err.println(USAGE); 737 return 1; 738 } 739 try { 740 int numMappers = Integer.parseInt(args[0]); 741 long numNodes = Long.parseLong(args[1]); 742 Path tmpOutput = new Path(args[2]); 743 Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]); 744 Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]); 745 Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]); 746 return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 747 } catch (NumberFormatException e) { 748 System.err.println("Parsing generator arguments failed: " + e.getMessage()); 749 System.err.println(USAGE); 750 return 1; 751 } 752 } 753 754 protected void createSchema() throws IOException { 755 Configuration conf = getConf(); 756 TableName tableName = getTableName(conf); 757 try (Connection conn = ConnectionFactory.createConnection(conf); 758 Admin admin = conn.getAdmin()) { 759 if (!admin.tableExists(tableName)) { 760 HTableDescriptor htd = new HTableDescriptor(getTableName(getConf())); 761 htd.addFamily(new HColumnDescriptor(FAMILY_NAME)); 762 // Always add these families. Just skip writing to them when we do not test per CF flush. 763 htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME)); 764 htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME)); 765 // if -DuseMob=true force all data through mob path. 766 if (conf.getBoolean("useMob", false)) { 767 for (HColumnDescriptor hcd : htd.getColumnFamilies() ) { 768 hcd.setMobEnabled(true); 769 hcd.setMobThreshold(4); 770 } 771 } 772 773 // If we want to pre-split compute how many splits. 774 if (conf.getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY, 775 HBaseTestingUtility.PRESPLIT_TEST_TABLE)) { 776 int numberOfServers = admin.getRegionServers().size(); 777 if (numberOfServers == 0) { 778 throw new IllegalStateException("No live regionservers"); 779 } 780 int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY, 781 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER); 782 int totalNumberOfRegions = numberOfServers * regionsPerServer; 783 LOG.info("Number of live regionservers: " + numberOfServers + ", " + 784 "pre-splitting table into " + totalNumberOfRegions + " regions " + 785 "(default regions per server: " + regionsPerServer + ")"); 786 787 788 byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions); 789 790 admin.createTable(htd, splits); 791 } else { 792 // Looks like we're just letting things play out. 793 // Create a table with on region by default. 794 // This will make the splitting work hard. 795 admin.createTable(htd); 796 } 797 } 798 } catch (MasterNotRunningException e) { 799 LOG.error("Master not running", e); 800 throw new IOException(e); 801 } 802 } 803 804 public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, 805 Integer width, Integer wrapMultiplier, Integer numWalkers) 806 throws Exception { 807 LOG.info("Running RandomInputGenerator with numMappers=" + numMappers 808 + ", numNodes=" + numNodes); 809 Job job = Job.getInstance(getConf()); 810 811 job.setJobName("Random Input Generator"); 812 job.setNumReduceTasks(0); 813 job.setJarByClass(getClass()); 814 815 job.setInputFormatClass(GeneratorInputFormat.class); 816 job.setOutputKeyClass(BytesWritable.class); 817 job.setOutputValueClass(NullWritable.class); 818 819 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 820 821 job.setMapperClass(Mapper.class); //identity mapper 822 823 FileOutputFormat.setOutputPath(job, tmpOutput); 824 job.setOutputFormatClass(SequenceFileOutputFormat.class); 825 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class); 826 827 boolean success = jobCompletion(job); 828 829 return success ? 0 : 1; 830 } 831 832 public int runGenerator(int numMappers, long numNodes, Path tmpOutput, 833 Integer width, Integer wrapMultiplier, Integer numWalkers) 834 throws Exception { 835 LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes); 836 createSchema(); 837 job = Job.getInstance(getConf()); 838 839 job.setJobName("Link Generator"); 840 job.setNumReduceTasks(0); 841 job.setJarByClass(getClass()); 842 843 FileInputFormat.setInputPaths(job, tmpOutput); 844 job.setInputFormatClass(OneFilePerMapperSFIF.class); 845 job.setOutputKeyClass(NullWritable.class); 846 job.setOutputValueClass(NullWritable.class); 847 848 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 849 850 setMapperForGenerator(job); 851 852 job.setOutputFormatClass(NullOutputFormat.class); 853 854 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 855 TableMapReduceUtil.addDependencyJars(job); 856 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 857 AbstractHBaseTool.class); 858 TableMapReduceUtil.initCredentials(job); 859 860 boolean success = jobCompletion(job); 861 862 return success ? 0 : 1; 863 } 864 865 protected boolean jobCompletion(Job job) throws IOException, InterruptedException, 866 ClassNotFoundException { 867 boolean success = job.waitForCompletion(true); 868 return success; 869 } 870 871 protected void setMapperForGenerator(Job job) { 872 job.setMapperClass(GeneratorMapper.class); 873 } 874 875 public int run(int numMappers, long numNodes, Path tmpOutput, 876 Integer width, Integer wrapMultiplier, Integer numWalkers) 877 throws Exception { 878 int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, 879 numWalkers); 880 if (ret > 0) { 881 return ret; 882 } 883 return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 884 } 885 886 public boolean verify() { 887 try { 888 Counters counters = job.getCounters(); 889 if (counters == null) { 890 LOG.info("Counters object was null, Generator verification cannot be performed." 891 + " This is commonly a result of insufficient YARN configuration."); 892 return false; 893 } 894 895 if (counters.findCounter(Counts.TERMINATING).getValue() > 0 || 896 counters.findCounter(Counts.UNDEFINED).getValue() > 0 || 897 counters.findCounter(Counts.IOEXCEPTION).getValue() > 0) { 898 LOG.error("Concurrent walker failed to verify during Generation phase"); 899 LOG.error("TERMINATING nodes: " + counters.findCounter(Counts.TERMINATING).getValue()); 900 LOG.error("UNDEFINED nodes: " + counters.findCounter(Counts.UNDEFINED).getValue()); 901 LOG.error("IOEXCEPTION nodes: " + counters.findCounter(Counts.IOEXCEPTION).getValue()); 902 return false; 903 } 904 } catch (IOException e) { 905 LOG.info("Generator verification could not find counter"); 906 return false; 907 } 908 return true; 909 } 910 } 911 912 /** 913 * Tool to search missing rows in WALs and hfiles. 914 * Pass in file or dir of keys to search for. Key file must have been written by Verify step 915 * (we depend on the format it writes out. We'll read them in and then search in hbase 916 * WALs and oldWALs dirs (Some of this is TODO). 917 */ 918 static class Search extends Configured implements Tool { 919 private static final Logger LOG = LoggerFactory.getLogger(Search.class); 920 protected Job job; 921 922 private static void printUsage(final String error) { 923 if (error != null && error.length() > 0) System.out.println("ERROR: " + error); 924 System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]"); 925 } 926 927 @Override 928 public int run(String[] args) throws Exception { 929 if (args.length < 1 || args.length > 2) { 930 printUsage(null); 931 return 1; 932 } 933 Path inputDir = new Path(args[0]); 934 int numMappers = 1; 935 if (args.length > 1) { 936 numMappers = Integer.parseInt(args[1]); 937 } 938 return run(inputDir, numMappers); 939 } 940 941 /** 942 * WALPlayer override that searches for keys loaded in the setup. 943 */ 944 public static class WALSearcher extends WALPlayer { 945 public WALSearcher(Configuration conf) { 946 super(conf); 947 } 948 949 /** 950 * The actual searcher mapper. 951 */ 952 public static class WALMapperSearcher extends WALMapper { 953 private SortedSet<byte []> keysToFind; 954 private AtomicInteger rows = new AtomicInteger(0); 955 956 @Override 957 public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) 958 throws IOException { 959 super.setup(context); 960 try { 961 this.keysToFind = readKeysToSearch(context.getConfiguration()); 962 LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); 963 } catch (InterruptedException e) { 964 throw new InterruptedIOException(e.toString()); 965 } 966 } 967 968 @Override 969 protected boolean filter(Context context, Cell cell) { 970 // TODO: Can I do a better compare than this copying out key? 971 byte [] row = new byte [cell.getRowLength()]; 972 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); 973 boolean b = this.keysToFind.contains(row); 974 if (b) { 975 String keyStr = Bytes.toStringBinary(row); 976 try { 977 LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); 978 } catch (IOException|InterruptedException e) { 979 LOG.warn(e.toString(), e); 980 } 981 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 982 context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); 983 } 984 context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); 985 } 986 return b; 987 } 988 } 989 990 // Put in place the above WALMapperSearcher. 991 @Override 992 public Job createSubmittableJob(String[] args) throws IOException { 993 Job job = super.createSubmittableJob(args); 994 // Call my class instead. 995 job.setJarByClass(WALMapperSearcher.class); 996 job.setMapperClass(WALMapperSearcher.class); 997 job.setOutputFormatClass(NullOutputFormat.class); 998 return job; 999 } 1000 } 1001 1002 static final String FOUND_GROUP_KEY = "Found"; 1003 static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; 1004 1005 public int run(Path inputDir, int numMappers) throws Exception { 1006 getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); 1007 SortedSet<byte []> keys = readKeysToSearch(getConf()); 1008 if (keys.isEmpty()) throw new RuntimeException("No keys to find"); 1009 LOG.info("Count of keys to find: " + keys.size()); 1010 for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); 1011 // Now read all WALs. In two dirs. Presumes certain layout. 1012 Path walsDir = new Path( 1013 CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME); 1014 Path oldWalsDir = new Path( 1015 CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME); 1016 LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers + 1017 " against " + getConf().get(HConstants.HBASE_DIR)); 1018 int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()), 1019 new String [] {walsDir.toString(), ""}); 1020 if (ret != 0) { 1021 return ret; 1022 } 1023 return ToolRunner.run(getConf(), new WALSearcher(getConf()), 1024 new String [] {oldWalsDir.toString(), ""}); 1025 } 1026 1027 static SortedSet<byte []> readKeysToSearch(final Configuration conf) 1028 throws IOException, InterruptedException { 1029 Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); 1030 FileSystem fs = FileSystem.get(conf); 1031 SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1032 if (!fs.exists(keysInputDir)) { 1033 throw new FileNotFoundException(keysInputDir.toString()); 1034 } 1035 if (!fs.isDirectory(keysInputDir)) { 1036 throw new UnsupportedOperationException("TODO"); 1037 } else { 1038 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false); 1039 while(iterator.hasNext()) { 1040 LocatedFileStatus keyFileStatus = iterator.next(); 1041 // Skip "_SUCCESS" file. 1042 if (keyFileStatus.getPath().getName().startsWith("_")) continue; 1043 result.addAll(readFileToSearch(conf, fs, keyFileStatus)); 1044 } 1045 } 1046 return result; 1047 } 1048 1049 private static SortedSet<byte[]> readFileToSearch(final Configuration conf, 1050 final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException, 1051 InterruptedException { 1052 SortedSet<byte []> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1053 // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is 1054 // what is missing. 1055 TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); 1056 try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr = 1057 new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) { 1058 InputSplit is = 1059 new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {}); 1060 rr.initialize(is, context); 1061 while (rr.nextKeyValue()) { 1062 rr.getCurrentKey(); 1063 BytesWritable bw = rr.getCurrentValue(); 1064 if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) { 1065 byte[] key = new byte[rr.getCurrentKey().getLength()]; 1066 System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey() 1067 .getLength()); 1068 result.add(key); 1069 } 1070 } 1071 } 1072 return result; 1073 } 1074 } 1075 1076 /** 1077 * A Map Reduce job that verifies that the linked lists generated by 1078 * {@link Generator} do not have any holes. 1079 */ 1080 static class Verify extends Configured implements Tool { 1081 1082 private static final Logger LOG = LoggerFactory.getLogger(Verify.class); 1083 protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 }); 1084 protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 }); 1085 1086 protected Job job; 1087 1088 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> { 1089 private BytesWritable row = new BytesWritable(); 1090 private BytesWritable ref = new BytesWritable(); 1091 1092 private boolean multipleUnevenColumnFamilies; 1093 1094 @Override 1095 protected void setup( 1096 Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context) 1097 throws IOException, InterruptedException { 1098 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 1099 } 1100 1101 @Override 1102 protected void map(ImmutableBytesWritable key, Result value, Context context) 1103 throws IOException ,InterruptedException { 1104 byte[] rowKey = key.get(); 1105 row.set(rowKey, 0, rowKey.length); 1106 if (multipleUnevenColumnFamilies 1107 && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn( 1108 TINY_FAMILY_NAME, TINY_FAMILY_NAME))) { 1109 context.write(row, DEF_LOST_FAMILIES); 1110 } else { 1111 context.write(row, DEF); 1112 } 1113 byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV); 1114 if (prev != null && prev.length > 0) { 1115 ref.set(prev, 0, prev.length); 1116 context.write(ref, row); 1117 } else { 1118 LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey))); 1119 } 1120 } 1121 } 1122 1123 /** 1124 * Don't change the order of these enums. Their ordinals are used as type flag when we emit 1125 * problems found from the reducer. 1126 */ 1127 public static enum Counts { 1128 UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES, 1129 LOST_FAMILIES 1130 } 1131 1132 /** 1133 * Per reducer, we output problem rows as byte arrasy so can be used as input for 1134 * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag 1135 * saying what sort of emission it is. Flag is the Count enum ordinal as a short. 1136 */ 1137 public static class VerifyReducer extends 1138 Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> { 1139 private ArrayList<byte[]> refs = new ArrayList<>(); 1140 private final BytesWritable UNREF = new BytesWritable(addPrefixFlag( 1141 Counts.UNREFERENCED.ordinal(), new byte[] {})); 1142 private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag( 1143 Counts.LOST_FAMILIES.ordinal(), new byte[] {})); 1144 1145 private AtomicInteger rows = new AtomicInteger(0); 1146 private Connection connection; 1147 1148 @Override 1149 protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1150 throws IOException, InterruptedException { 1151 super.setup(context); 1152 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 1153 } 1154 1155 @Override 1156 protected void cleanup( 1157 Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1158 throws IOException, InterruptedException { 1159 if (this.connection != null) { 1160 this.connection.close(); 1161 } 1162 super.cleanup(context); 1163 } 1164 1165 /** 1166 * @param ordinal 1167 * @param r 1168 * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up 1169 * Bytes.SIZEOF_SHORT bytes followed by <code>r</code> 1170 */ 1171 public static byte[] addPrefixFlag(final int ordinal, final byte [] r) { 1172 byte[] prefix = Bytes.toBytes((short)ordinal); 1173 if (prefix.length != Bytes.SIZEOF_SHORT) { 1174 throw new RuntimeException("Unexpected size: " + prefix.length); 1175 } 1176 byte[] result = new byte[prefix.length + r.length]; 1177 System.arraycopy(prefix, 0, result, 0, prefix.length); 1178 System.arraycopy(r, 0, result, prefix.length, r.length); 1179 return result; 1180 } 1181 1182 /** 1183 * @param bs 1184 * @return Type from the Counts enum of this row. Reads prefix added by 1185 * {@link #addPrefixFlag(int, byte[])} 1186 */ 1187 public static Counts whichType(final byte [] bs) { 1188 int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT); 1189 return Counts.values()[ordinal]; 1190 } 1191 1192 /** 1193 * @param bw 1194 * @return Row bytes minus the type flag. 1195 */ 1196 public static byte[] getRowOnly(BytesWritable bw) { 1197 byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT]; 1198 System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length); 1199 return bytes; 1200 } 1201 1202 @Override 1203 public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context) 1204 throws IOException, InterruptedException { 1205 int defCount = 0; 1206 boolean lostFamilies = false; 1207 refs.clear(); 1208 for (BytesWritable type : values) { 1209 if (type.getLength() == DEF.getLength()) { 1210 defCount++; 1211 if (type.getBytes()[0] == 1) { 1212 lostFamilies = true; 1213 } 1214 } else { 1215 byte[] bytes = new byte[type.getLength()]; 1216 System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength()); 1217 refs.add(bytes); 1218 } 1219 } 1220 1221 // TODO check for more than one def, should not happen 1222 StringBuilder refsSb = null; 1223 if (defCount == 0 || refs.size() != 1) { 1224 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1225 refsSb = dumpExtraInfoOnRefs(key, context, refs); 1226 LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" + 1227 (refsSb != null? refsSb.toString(): "")); 1228 } 1229 if (lostFamilies) { 1230 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1231 LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families"); 1232 context.getCounter(Counts.LOST_FAMILIES).increment(1); 1233 context.write(key, LOSTFAM); 1234 } 1235 1236 if (defCount == 0 && refs.size() > 0) { 1237 // This is bad, found a node that is referenced but not defined. It must have been 1238 // lost, emit some info about this node for debugging purposes. 1239 // Write out a line per reference. If more than one, flag it.; 1240 for (int i = 0; i < refs.size(); i++) { 1241 byte[] bs = refs.get(i); 1242 int ordinal; 1243 if (i <= 0) { 1244 ordinal = Counts.UNDEFINED.ordinal(); 1245 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1246 context.getCounter(Counts.UNDEFINED).increment(1); 1247 } else { 1248 ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal(); 1249 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1250 } 1251 } 1252 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1253 // Print out missing row; doing get on reference gives info on when the referencer 1254 // was added which can help a little debugging. This info is only available in mapper 1255 // output -- the 'Linked List error Key...' log message above. What we emit here is 1256 // useless for debugging. 1257 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1258 context.getCounter("undef", keyString).increment(1); 1259 } 1260 } else if (defCount > 0 && refs.isEmpty()) { 1261 // node is defined but not referenced 1262 context.write(key, UNREF); 1263 context.getCounter(Counts.UNREFERENCED).increment(1); 1264 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1265 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1266 context.getCounter("unref", keyString).increment(1); 1267 } 1268 } else { 1269 if (refs.size() > 1) { 1270 // Skip first reference. 1271 for (int i = 1; i < refs.size(); i++) { 1272 context.write(key, 1273 new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i)))); 1274 } 1275 context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1); 1276 } 1277 // node is defined and referenced 1278 context.getCounter(Counts.REFERENCED).increment(1); 1279 } 1280 } 1281 1282 /** 1283 * Dump out extra info around references if there are any. Helps debugging. 1284 * @return StringBuilder filled with references if any. 1285 * @throws IOException 1286 */ 1287 private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context, 1288 final List<byte []> refs) 1289 throws IOException { 1290 StringBuilder refsSb = null; 1291 if (refs.isEmpty()) return refsSb; 1292 refsSb = new StringBuilder(); 1293 String comma = ""; 1294 // If a row is a reference but has no define, print the content of the row that has 1295 // this row as a 'prev'; it will help debug. The missing row was written just before 1296 // the row we are dumping out here. 1297 TableName tn = getTableName(context.getConfiguration()); 1298 try (Table t = this.connection.getTable(tn)) { 1299 for (byte [] ref : refs) { 1300 Result r = t.get(new Get(ref)); 1301 List<Cell> cells = r.listCells(); 1302 String ts = (cells != null && !cells.isEmpty())? 1303 new java.util.Date(cells.get(0).getTimestamp()).toString(): ""; 1304 byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT); 1305 String jobStr = (b != null && b.length > 0)? Bytes.toString(b): ""; 1306 b = r.getValue(FAMILY_NAME, COLUMN_COUNT); 1307 long count = (b != null && b.length > 0)? Bytes.toLong(b): -1; 1308 b = r.getValue(FAMILY_NAME, COLUMN_PREV); 1309 String refRegionLocation = ""; 1310 String keyRegionLocation = ""; 1311 if (b != null && b.length > 0) { 1312 try (RegionLocator rl = this.connection.getRegionLocator(tn)) { 1313 HRegionLocation hrl = rl.getRegionLocation(b); 1314 if (hrl != null) refRegionLocation = hrl.toString(); 1315 // Key here probably has trailing zeros on it. 1316 hrl = rl.getRegionLocation(key.getBytes()); 1317 if (hrl != null) keyRegionLocation = hrl.toString(); 1318 } 1319 } 1320 LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) + 1321 ", refPrevEqualsKey=" + 1322 (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) + 1323 ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) + 1324 ", ref row date=" + ts + ", jobStr=" + jobStr + 1325 ", ref row count=" + count + 1326 ", ref row regionLocation=" + refRegionLocation + 1327 ", key row regionLocation=" + keyRegionLocation); 1328 refsSb.append(comma); 1329 comma = ","; 1330 refsSb.append(Bytes.toStringBinary(ref)); 1331 } 1332 } 1333 return refsSb; 1334 } 1335 } 1336 1337 @Override 1338 public int run(String[] args) throws Exception { 1339 if (args.length != 2) { 1340 System.out.println("Usage : " + Verify.class.getSimpleName() 1341 + " <output dir> <num reducers>"); 1342 return 0; 1343 } 1344 1345 String outputDir = args[0]; 1346 int numReducers = Integer.parseInt(args[1]); 1347 1348 return run(outputDir, numReducers); 1349 } 1350 1351 public int run(String outputDir, int numReducers) throws Exception { 1352 return run(new Path(outputDir), numReducers); 1353 } 1354 1355 public int run(Path outputDir, int numReducers) throws Exception { 1356 LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers); 1357 1358 job = Job.getInstance(getConf()); 1359 1360 job.setJobName("Link Verifier"); 1361 job.setNumReduceTasks(numReducers); 1362 job.setJarByClass(getClass()); 1363 1364 setJobScannerConf(job); 1365 1366 Scan scan = new Scan(); 1367 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1368 scan.setCaching(10000); 1369 scan.setCacheBlocks(false); 1370 if (isMultiUnevenColumnFamilies(getConf())) { 1371 scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME); 1372 scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME); 1373 } 1374 1375 TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan, 1376 VerifyMapper.class, BytesWritable.class, BytesWritable.class, job); 1377 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 1378 AbstractHBaseTool.class); 1379 1380 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 1381 1382 job.setReducerClass(VerifyReducer.class); 1383 job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class); 1384 job.setOutputKeyClass(BytesWritable.class); 1385 job.setOutputValueClass(BytesWritable.class); 1386 TextOutputFormat.setOutputPath(job, outputDir); 1387 1388 boolean success = job.waitForCompletion(true); 1389 1390 if (success) { 1391 Counters counters = job.getCounters(); 1392 if (null == counters) { 1393 LOG.warn("Counters were null, cannot verify Job completion." 1394 + " This is commonly a result of insufficient YARN configuration."); 1395 // We don't have access to the counters to know if we have "bad" counts 1396 return 0; 1397 } 1398 1399 // If we find no unexpected values, the job didn't outright fail 1400 if (verifyUnexpectedValues(counters)) { 1401 // We didn't check referenced+unreferenced counts, leave that to visual inspection 1402 return 0; 1403 } 1404 } 1405 1406 // We failed 1407 return 1; 1408 } 1409 1410 public boolean verify(long expectedReferenced) throws Exception { 1411 if (job == null) { 1412 throw new IllegalStateException("You should call run() first"); 1413 } 1414 1415 Counters counters = job.getCounters(); 1416 if (counters == null) { 1417 LOG.info("Counters object was null, write verification cannot be performed." 1418 + " This is commonly a result of insufficient YARN configuration."); 1419 return false; 1420 } 1421 1422 // Run through each check, even if we fail one early 1423 boolean success = verifyExpectedValues(expectedReferenced, counters); 1424 1425 if (!verifyUnexpectedValues(counters)) { 1426 // We found counter objects which imply failure 1427 success = false; 1428 } 1429 1430 if (!success) { 1431 handleFailure(counters); 1432 } 1433 return success; 1434 } 1435 1436 /** 1437 * Verify the values in the Counters against the expected number of entries written. 1438 * 1439 * @param expectedReferenced 1440 * Expected number of referenced entrires 1441 * @param counters 1442 * The Job's Counters object 1443 * @return True if the values match what's expected, false otherwise 1444 */ 1445 protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) { 1446 final Counter referenced = counters.findCounter(Counts.REFERENCED); 1447 final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED); 1448 boolean success = true; 1449 1450 if (expectedReferenced != referenced.getValue()) { 1451 LOG.error("Expected referenced count does not match with actual referenced count. " + 1452 "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue()); 1453 success = false; 1454 } 1455 1456 if (unreferenced.getValue() > 0) { 1457 final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES); 1458 boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue()); 1459 LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue() 1460 + (couldBeMultiRef ? "; could be due to duplicate random numbers" : "")); 1461 success = false; 1462 } 1463 1464 return success; 1465 } 1466 1467 /** 1468 * Verify that the Counters don't contain values which indicate an outright failure from the Reducers. 1469 * 1470 * @param counters 1471 * The Job's counters 1472 * @return True if the "bad" counter objects are 0, false otherwise 1473 */ 1474 protected boolean verifyUnexpectedValues(Counters counters) { 1475 final Counter undefined = counters.findCounter(Counts.UNDEFINED); 1476 final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES); 1477 boolean success = true; 1478 1479 if (undefined.getValue() > 0) { 1480 LOG.error("Found an undefined node. Undefined count=" + undefined.getValue()); 1481 success = false; 1482 } 1483 1484 if (lostfamilies.getValue() > 0) { 1485 LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue()); 1486 success = false; 1487 } 1488 1489 return success; 1490 } 1491 1492 protected void handleFailure(Counters counters) throws IOException { 1493 Configuration conf = job.getConfiguration(); 1494 TableName tableName = getTableName(conf); 1495 try (Connection conn = ConnectionFactory.createConnection(conf)) { 1496 try (RegionLocator rl = conn.getRegionLocator(tableName)) { 1497 CounterGroup g = counters.getGroup("undef"); 1498 Iterator<Counter> it = g.iterator(); 1499 while (it.hasNext()) { 1500 String keyString = it.next().getName(); 1501 byte[] key = Bytes.toBytes(keyString); 1502 HRegionLocation loc = rl.getRegionLocation(key, true); 1503 LOG.error("undefined row " + keyString + ", " + loc); 1504 } 1505 g = counters.getGroup("unref"); 1506 it = g.iterator(); 1507 while (it.hasNext()) { 1508 String keyString = it.next().getName(); 1509 byte[] key = Bytes.toBytes(keyString); 1510 HRegionLocation loc = rl.getRegionLocation(key, true); 1511 LOG.error("unreferred row " + keyString + ", " + loc); 1512 } 1513 } 1514 } 1515 } 1516 } 1517 1518 /** 1519 * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration 1520 * adds more data. 1521 */ 1522 static class Loop extends Configured implements Tool { 1523 1524 private static final Logger LOG = LoggerFactory.getLogger(Loop.class); 1525 private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " + 1526 "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" + 1527 " <num walker threads>] \n" + 1528 "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" + 1529 "walkers will select and verify random flushed loop during Generation."; 1530 1531 IntegrationTestBigLinkedList it; 1532 1533 protected void runGenerator(int numMappers, long numNodes, 1534 String outputDir, Integer width, Integer wrapMultiplier, Integer numWalkers) 1535 throws Exception { 1536 Path outputPath = new Path(outputDir); 1537 UUID uuid = UUID.randomUUID(); //create a random UUID. 1538 Path generatorOutput = new Path(outputPath, uuid.toString()); 1539 1540 Generator generator = new Generator(); 1541 generator.setConf(getConf()); 1542 int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 1543 numWalkers); 1544 if (retCode > 0) { 1545 throw new RuntimeException("Generator failed with return code: " + retCode); 1546 } 1547 if (numWalkers > 0) { 1548 if (!generator.verify()) { 1549 throw new RuntimeException("Generator.verify failed"); 1550 } 1551 } 1552 } 1553 1554 protected void runVerify(String outputDir, 1555 int numReducers, long expectedNumNodes) throws Exception { 1556 Path outputPath = new Path(outputDir); 1557 UUID uuid = UUID.randomUUID(); //create a random UUID. 1558 Path iterationOutput = new Path(outputPath, uuid.toString()); 1559 1560 Verify verify = new Verify(); 1561 verify.setConf(getConf()); 1562 int retCode = verify.run(iterationOutput, numReducers); 1563 if (retCode > 0) { 1564 throw new RuntimeException("Verify.run failed with return code: " + retCode); 1565 } 1566 1567 if (!verify.verify(expectedNumNodes)) { 1568 throw new RuntimeException("Verify.verify failed"); 1569 } 1570 LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes); 1571 } 1572 1573 @Override 1574 public int run(String[] args) throws Exception { 1575 if (args.length < 5) { 1576 System.err.println(USAGE); 1577 return 1; 1578 } 1579 try { 1580 int numIterations = Integer.parseInt(args[0]); 1581 int numMappers = Integer.parseInt(args[1]); 1582 long numNodes = Long.parseLong(args[2]); 1583 String outputDir = args[3]; 1584 int numReducers = Integer.parseInt(args[4]); 1585 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); 1586 Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); 1587 Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]); 1588 1589 long expectedNumNodes = 0; 1590 1591 if (numIterations < 0) { 1592 numIterations = Integer.MAX_VALUE; //run indefinitely (kind of) 1593 } 1594 LOG.info("Running Loop with args:" + Arrays.deepToString(args)); 1595 for (int i = 0; i < numIterations; i++) { 1596 LOG.info("Starting iteration = " + i); 1597 runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers); 1598 expectedNumNodes += numMappers * numNodes; 1599 runVerify(outputDir, numReducers, expectedNumNodes); 1600 } 1601 return 0; 1602 } catch (NumberFormatException e) { 1603 System.err.println("Parsing loop arguments failed: " + e.getMessage()); 1604 System.err.println(USAGE); 1605 return 1; 1606 } 1607 } 1608 } 1609 1610 /** 1611 * A stand alone program that prints out portions of a list created by {@link Generator} 1612 */ 1613 private static class Print extends Configured implements Tool { 1614 @Override 1615 public int run(String[] args) throws Exception { 1616 Options options = new Options(); 1617 options.addOption("s", "start", true, "start key"); 1618 options.addOption("e", "end", true, "end key"); 1619 options.addOption("l", "limit", true, "number to print"); 1620 1621 GnuParser parser = new GnuParser(); 1622 CommandLine cmd = null; 1623 try { 1624 cmd = parser.parse(options, args); 1625 if (cmd.getArgs().length != 0) { 1626 throw new ParseException("Command takes no arguments"); 1627 } 1628 } catch (ParseException e) { 1629 System.err.println("Failed to parse command line " + e.getMessage()); 1630 System.err.println(); 1631 HelpFormatter formatter = new HelpFormatter(); 1632 formatter.printHelp(getClass().getSimpleName(), options); 1633 System.exit(-1); 1634 } 1635 1636 Connection connection = ConnectionFactory.createConnection(getConf()); 1637 Table table = connection.getTable(getTableName(getConf())); 1638 1639 Scan scan = new Scan(); 1640 scan.setBatch(10000); 1641 1642 if (cmd.hasOption("s")) 1643 scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s"))); 1644 1645 if (cmd.hasOption("e")) 1646 scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e"))); 1647 1648 int limit = 0; 1649 if (cmd.hasOption("l")) 1650 limit = Integer.parseInt(cmd.getOptionValue("l")); 1651 else 1652 limit = 100; 1653 1654 ResultScanner scanner = table.getScanner(scan); 1655 1656 CINode node = new CINode(); 1657 Result result = scanner.next(); 1658 int count = 0; 1659 while (result != null && count++ < limit) { 1660 node = getCINode(result, node); 1661 System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key), 1662 Bytes.toStringBinary(node.prev), node.count, node.client); 1663 result = scanner.next(); 1664 } 1665 scanner.close(); 1666 table.close(); 1667 connection.close(); 1668 1669 return 0; 1670 } 1671 } 1672 1673 /** 1674 * A stand alone program that deletes a single node. 1675 */ 1676 private static class Delete extends Configured implements Tool { 1677 @Override 1678 public int run(String[] args) throws Exception { 1679 if (args.length != 1) { 1680 System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>"); 1681 return 0; 1682 } 1683 byte[] val = Bytes.toBytesBinary(args[0]); 1684 1685 org.apache.hadoop.hbase.client.Delete delete 1686 = new org.apache.hadoop.hbase.client.Delete(val); 1687 1688 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1689 Table table = connection.getTable(getTableName(getConf()))) { 1690 table.delete(delete); 1691 } 1692 1693 System.out.println("Delete successful"); 1694 return 0; 1695 } 1696 } 1697 1698 abstract static class WalkerBase extends Configured{ 1699 protected static CINode findStartNode(Table table, byte[] startKey) throws IOException { 1700 Scan scan = new Scan(); 1701 scan.setStartRow(startKey); 1702 scan.setBatch(1); 1703 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1704 1705 long t1 = System.currentTimeMillis(); 1706 ResultScanner scanner = table.getScanner(scan); 1707 Result result = scanner.next(); 1708 long t2 = System.currentTimeMillis(); 1709 scanner.close(); 1710 1711 if ( result != null) { 1712 CINode node = getCINode(result, new CINode()); 1713 System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key)); 1714 return node; 1715 } 1716 1717 System.out.println("FSR " + (t2 - t1)); 1718 1719 return null; 1720 } 1721 protected CINode getNode(byte[] row, Table table, CINode node) throws IOException { 1722 Get get = new Get(row); 1723 get.addColumn(FAMILY_NAME, COLUMN_PREV); 1724 Result result = table.get(get); 1725 return getCINode(result, node); 1726 } 1727 } 1728 /** 1729 * A stand alone program that follows a linked list created by {@link Generator} and prints 1730 * timing info. 1731 */ 1732 private static class Walker extends WalkerBase implements Tool { 1733 1734 public Walker(){} 1735 1736 @Override 1737 public int run(String[] args) throws IOException { 1738 1739 Options options = new Options(); 1740 options.addOption("n", "num", true, "number of queries"); 1741 options.addOption("s", "start", true, "key to start at, binary string"); 1742 options.addOption("l", "logevery", true, "log every N queries"); 1743 1744 GnuParser parser = new GnuParser(); 1745 CommandLine cmd = null; 1746 try { 1747 cmd = parser.parse(options, args); 1748 if (cmd.getArgs().length != 0) { 1749 throw new ParseException("Command takes no arguments"); 1750 } 1751 } catch (ParseException e) { 1752 System.err.println("Failed to parse command line " + e.getMessage()); 1753 System.err.println(); 1754 HelpFormatter formatter = new HelpFormatter(); 1755 formatter.printHelp(getClass().getSimpleName(), options); 1756 System.exit(-1); 1757 } 1758 1759 long maxQueries = Long.MAX_VALUE; 1760 if (cmd.hasOption('n')) { 1761 maxQueries = Long.parseLong(cmd.getOptionValue("n")); 1762 } 1763 Random rand = new SecureRandom(); 1764 boolean isSpecificStart = cmd.hasOption('s'); 1765 1766 byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null; 1767 int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1; 1768 1769 Connection connection = ConnectionFactory.createConnection(getConf()); 1770 Table table = connection.getTable(getTableName(getConf())); 1771 long numQueries = 0; 1772 // If isSpecificStart is set, only walk one list from that particular node. 1773 // Note that in case of circular (or P-shaped) list it will walk forever, as is 1774 // the case in normal run without startKey. 1775 while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) { 1776 if (!isSpecificStart) { 1777 startKey = new byte[ROWKEY_LENGTH]; 1778 rand.nextBytes(startKey); 1779 } 1780 CINode node = findStartNode(table, startKey); 1781 if (node == null && isSpecificStart) { 1782 System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey)); 1783 } 1784 numQueries++; 1785 while (node != null && node.prev.length != NO_KEY.length && 1786 numQueries < maxQueries) { 1787 byte[] prev = node.prev; 1788 long t1 = System.currentTimeMillis(); 1789 node = getNode(prev, table, node); 1790 long t2 = System.currentTimeMillis(); 1791 if (logEvery > 0 && numQueries % logEvery == 0) { 1792 System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev)); 1793 } 1794 numQueries++; 1795 if (node == null) { 1796 System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev)); 1797 } else if (node.prev.length == NO_KEY.length) { 1798 System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key)); 1799 } 1800 } 1801 } 1802 table.close(); 1803 connection.close(); 1804 return 0; 1805 } 1806 } 1807 1808 private static class Clean extends Configured implements Tool { 1809 @Override public int run(String[] args) throws Exception { 1810 if (args.length < 1) { 1811 System.err.println("Usage: Clean <output dir>"); 1812 return -1; 1813 } 1814 1815 Path p = new Path(args[0]); 1816 Configuration conf = getConf(); 1817 TableName tableName = getTableName(conf); 1818 try (FileSystem fs = HFileSystem.get(conf); 1819 Connection conn = ConnectionFactory.createConnection(conf); 1820 Admin admin = conn.getAdmin()) { 1821 if (admin.tableExists(tableName)) { 1822 admin.disableTable(tableName); 1823 admin.deleteTable(tableName); 1824 } 1825 1826 if (fs.exists(p)) { 1827 fs.delete(p, true); 1828 } 1829 } 1830 1831 return 0; 1832 } 1833 } 1834 1835 static TableName getTableName(Configuration conf) { 1836 return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1837 } 1838 1839 private static CINode getCINode(Result result, CINode node) { 1840 node.key = Bytes.copy(result.getRow()); 1841 if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) { 1842 node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV)); 1843 } else { 1844 node.prev = NO_KEY; 1845 } 1846 if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) { 1847 node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT)); 1848 } else { 1849 node.count = -1; 1850 } 1851 if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) { 1852 node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT)); 1853 } else { 1854 node.client = ""; 1855 } 1856 return node; 1857 } 1858 1859 protected IntegrationTestingUtility util; 1860 1861 @Override 1862 public void setUpCluster() throws Exception { 1863 util = getTestingUtil(getConf()); 1864 boolean isDistributed = util.isDistributedCluster(); 1865 util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE); 1866 if (!isDistributed) { 1867 util.startMiniMapReduceCluster(); 1868 } 1869 this.setConf(util.getConfiguration()); 1870 } 1871 1872 @Override 1873 public void cleanUpCluster() throws Exception { 1874 super.cleanUpCluster(); 1875 if (util.isDistributedCluster()) { 1876 util.shutdownMiniMapReduceCluster(); 1877 } 1878 } 1879 1880 private static boolean isMultiUnevenColumnFamilies(Configuration conf) { 1881 return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,true); 1882 } 1883 1884 @Test 1885 public void testContinuousIngest() throws IOException, Exception { 1886 //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> 1887 Configuration conf = getTestingUtil(getConf()).getConfiguration(); 1888 if (isMultiUnevenColumnFamilies(getConf())) { 1889 // make sure per CF flush is on 1890 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName()); 1891 } 1892 int ret = 1893 ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000", 1894 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" }); 1895 org.junit.Assert.assertEquals(0, ret); 1896 } 1897 1898 private void usage() { 1899 System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]"); 1900 printCommands(); 1901 } 1902 1903 private void printCommands() { 1904 System.err.println("Commands:"); 1905 System.err.println(" generator Map only job that generates data."); 1906 System.err.println(" verify A map reduce job that looks for holes. Check return code and"); 1907 System.err.println(" look at the counts after running. See REFERENCED and"); 1908 System.err.println(" UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run"); 1909 System.err.println(" with the Generator."); 1910 System.err.println(" walker " + 1911 "Standalone program that starts following a linked list & emits timing info."); 1912 System.err.println(" print Standalone program that prints nodes in the linked list."); 1913 System.err.println(" delete Standalone program that deletes a single node."); 1914 System.err.println(" loop Program to Loop through Generator and Verify steps"); 1915 System.err.println(" clean Program to clean all left over detritus."); 1916 System.err.println(" search Search for missing keys."); 1917 System.err.println(""); 1918 System.err.println("General options:"); 1919 System.err.println(" -D"+ TABLE_NAME_KEY+ "=<tableName>"); 1920 System.err.println(" Run using the <tableName> as the tablename. Defaults to " 1921 + DEFAULT_TABLE_NAME); 1922 System.err.println(" -D"+ HBaseTestingUtility.REGIONS_PER_SERVER_KEY+ "=<# regions>"); 1923 System.err.println(" Create table with presplit regions per server. Defaults to " 1924 + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER); 1925 1926 System.err.println(" -DuseMob=<true|false>"); 1927 System.err.println(" Create table so that the mob read/write path is forced. " + 1928 "Defaults to false"); 1929 1930 System.err.flush(); 1931 } 1932 1933 @Override 1934 protected void processOptions(CommandLine cmd) { 1935 super.processOptions(cmd); 1936 String[] args = cmd.getArgs(); 1937 //get the class, run with the conf 1938 if (args.length < 1) { 1939 printUsage(this.getClass().getSimpleName() + 1940 " <general options> COMMAND [<COMMAND options>]", "General options:", ""); 1941 printCommands(); 1942 // Have to throw an exception here to stop the processing. Looks ugly but gets message across. 1943 throw new RuntimeException("Incorrect Number of args."); 1944 } 1945 toRun = args[0]; 1946 otherArgs = Arrays.copyOfRange(args, 1, args.length); 1947 } 1948 1949 @Override 1950 public int runTestFromCommandLine() throws Exception { 1951 Tool tool = null; 1952 if (toRun.equalsIgnoreCase("Generator")) { 1953 tool = new Generator(); 1954 } else if (toRun.equalsIgnoreCase("Verify")) { 1955 tool = new Verify(); 1956 } else if (toRun.equalsIgnoreCase("Loop")) { 1957 Loop loop = new Loop(); 1958 loop.it = this; 1959 tool = loop; 1960 } else if (toRun.equalsIgnoreCase("Walker")) { 1961 tool = new Walker(); 1962 } else if (toRun.equalsIgnoreCase("Print")) { 1963 tool = new Print(); 1964 } else if (toRun.equalsIgnoreCase("Delete")) { 1965 tool = new Delete(); 1966 } else if (toRun.equalsIgnoreCase("Clean")) { 1967 tool = new Clean(); 1968 } else if (toRun.equalsIgnoreCase("Search")) { 1969 tool = new Search(); 1970 } else { 1971 usage(); 1972 throw new RuntimeException("Unknown arg"); 1973 } 1974 1975 return ToolRunner.run(getConf(), tool, otherArgs); 1976 } 1977 1978 @Override 1979 public TableName getTablename() { 1980 Configuration c = getConf(); 1981 return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1982 } 1983 1984 @Override 1985 protected Set<String> getColumnFamilies() { 1986 if (isMultiUnevenColumnFamilies(getConf())) { 1987 return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME), 1988 Bytes.toString(TINY_FAMILY_NAME)); 1989 } else { 1990 return Sets.newHashSet(Bytes.toString(FAMILY_NAME)); 1991 } 1992 } 1993 1994 private static void setJobConf(Job job, int numMappers, long numNodes, 1995 Integer width, Integer wrapMultiplier, Integer numWalkers) { 1996 job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); 1997 job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes); 1998 if (width != null) { 1999 job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width); 2000 } 2001 if (wrapMultiplier != null) { 2002 job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier); 2003 } 2004 if (numWalkers != null) { 2005 job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers); 2006 } 2007 } 2008 2009 public static void setJobScannerConf(Job job) { 2010 // Make sure scanners log something useful to make debugging possible. 2011 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true); 2012 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000); 2013 } 2014 2015 public static void main(String[] args) throws Exception { 2016 Configuration conf = HBaseConfiguration.create(); 2017 IntegrationTestingUtility.setUseDistributedCluster(conf); 2018 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args); 2019 System.exit(ret); 2020 } 2021}