001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Set; 030import java.util.SortedSet; 031import java.util.TreeSet; 032import java.util.UUID; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.atomic.AtomicInteger; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.conf.Configured; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.LocatedFileStatus; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.fs.RemoteIterator; 041import org.apache.hadoop.hbase.Cell; 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.HBaseTestingUtility; 044import org.apache.hadoop.hbase.HColumnDescriptor; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HRegionLocation; 047import org.apache.hadoop.hbase.HTableDescriptor; 048import org.apache.hadoop.hbase.IntegrationTestBase; 049import org.apache.hadoop.hbase.IntegrationTestingUtility; 050import org.apache.hadoop.hbase.MasterNotRunningException; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.client.Admin; 053import org.apache.hadoop.hbase.client.BufferedMutator; 054import org.apache.hadoop.hbase.client.BufferedMutatorParams; 055import org.apache.hadoop.hbase.client.Connection; 056import org.apache.hadoop.hbase.client.ConnectionConfiguration; 057import org.apache.hadoop.hbase.client.ConnectionFactory; 058import org.apache.hadoop.hbase.client.Get; 059import org.apache.hadoop.hbase.client.Mutation; 060import org.apache.hadoop.hbase.client.Put; 061import org.apache.hadoop.hbase.client.RegionLocator; 062import org.apache.hadoop.hbase.client.Result; 063import org.apache.hadoop.hbase.client.ResultScanner; 064import org.apache.hadoop.hbase.client.Scan; 065import org.apache.hadoop.hbase.client.ScannerCallable; 066import org.apache.hadoop.hbase.client.Table; 067import org.apache.hadoop.hbase.fs.HFileSystem; 068import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 069import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 070import org.apache.hadoop.hbase.mapreduce.TableMapper; 071import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; 072import org.apache.hadoop.hbase.mapreduce.WALPlayer; 073import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy; 074import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; 075import org.apache.hadoop.hbase.testclassification.IntegrationTests; 076import org.apache.hadoop.hbase.util.AbstractHBaseTool; 077import org.apache.hadoop.hbase.util.Bytes; 078import org.apache.hadoop.hbase.util.CommonFSUtils; 079import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 080import org.apache.hadoop.hbase.util.Random64; 081import org.apache.hadoop.hbase.util.RegionSplitter; 082import org.apache.hadoop.hbase.wal.WALEdit; 083import org.apache.hadoop.hbase.wal.WALKey; 084import org.apache.hadoop.io.BytesWritable; 085import org.apache.hadoop.io.NullWritable; 086import org.apache.hadoop.io.Writable; 087import org.apache.hadoop.mapreduce.Counter; 088import org.apache.hadoop.mapreduce.CounterGroup; 089import org.apache.hadoop.mapreduce.Counters; 090import org.apache.hadoop.mapreduce.InputFormat; 091import org.apache.hadoop.mapreduce.InputSplit; 092import org.apache.hadoop.mapreduce.Job; 093import org.apache.hadoop.mapreduce.JobContext; 094import org.apache.hadoop.mapreduce.Mapper; 095import org.apache.hadoop.mapreduce.RecordReader; 096import org.apache.hadoop.mapreduce.Reducer; 097import org.apache.hadoop.mapreduce.TaskAttemptContext; 098import org.apache.hadoop.mapreduce.TaskAttemptID; 099import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 100import org.apache.hadoop.mapreduce.lib.input.FileSplit; 101import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat; 102import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 103import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 104import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 105import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat; 106import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 107import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 108import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; 109import org.apache.hadoop.util.Tool; 110import org.apache.hadoop.util.ToolRunner; 111import org.junit.Test; 112import org.junit.experimental.categories.Category; 113import org.slf4j.Logger; 114import org.slf4j.LoggerFactory; 115 116import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 117import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 118import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; 119import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 120import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser; 121import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; 122import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; 123import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; 124 125/** 126 * <p> 127 * This is an integration test borrowed from goraci, written by Keith Turner, which is in turn 128 * inspired by the Accumulo test called continous ingest (ci). The original source code can be found 129 * here: 130 * <ul> 131 * <li>https://github.com/keith-turner/goraci</li> 132 * <li>https://github.com/enis/goraci/</li> 133 * </ul> 134 * </p> 135 * <p> 136 * Apache Accumulo [0] has a simple test suite that verifies that data is not lost at scale. This 137 * test suite is called continuous ingest. This test runs many ingest clients that continually 138 * create linked lists containing 25 million nodes. At some point the clients are stopped and a map 139 * reduce job is run to ensure no linked list has a hole. A hole indicates data was lost. 140 * </p> 141 * <p> 142 * The nodes in the linked list are random. This causes each linked list to spread across the table. 143 * Therefore if one part of a table loses data, then it will be detected by references in another 144 * part of the table. 145 * </p> 146 * <p> 147 * <h3>THE ANATOMY OF THE TEST</h3> Below is rough sketch of how data is written. For specific 148 * details look at the Generator code. 149 * </p> 150 * <p> 151 * <ol> 152 * <li>Write out 1 million nodes (1M is the configurable 'width' mentioned below)</li> 153 * <li>Flush the client</li> 154 * <li>Write out 1 million that reference previous million</li> 155 * <li>If this is the 25th set of 1 million nodes, then update 1st set of million to point to last 156 * (25 is configurable; its the 'wrap multiplier' referred to below)</li> 157 * <li>goto 1</li> 158 * </ol> 159 * </p> 160 * <p> 161 * The key is that nodes only reference flushed nodes. Therefore a node should never reference a 162 * missing node, even if the ingest client is killed at any point in time. 163 * </p> 164 * <p> 165 * When running this test suite w/ Accumulo there is a script running in parallel called the 166 * Aggitator that randomly and continuously kills server processes. The outcome was that many data 167 * loss bugs were found in Accumulo by doing this. This test suite can also help find bugs that 168 * impact uptime and stability when run for days or weeks. 169 * </p> 170 * <p> 171 * This test suite consists the following 172 * <ul> 173 * <li>a few Java programs</li> 174 * <li>a little helper script to run the java programs</li> 175 * <li>a maven script to build it</li> 176 * </ul> 177 * </p> 178 * <p> 179 * When generating data, its best to have each map task generate a multiple of 25 million. The 180 * reason for this is that circular linked list are generated every 25M. Not generating a multiple 181 * in 25M will result in some nodes in the linked list not having references. The loss of an 182 * unreferenced node can not be detected. 183 * </p> 184 * <p> 185 * <h3>Below is a description of the Java programs</h3> 186 * <ul> 187 * <li>{@code Generator} - A map only job that generates data. As stated previously, its best to 188 * generate data in multiples of 25M. An option is also available to allow concurrent walkers to 189 * select and walk random flushed loops during this phase.</li> 190 * <li>{@code Verify} - A map reduce job that looks for holes. Look at the counts after running. 191 * {@code REFERENCED} and {@code UNREFERENCED} are ok, any {@code UNDEFINED} counts are bad. Do not 192 * run at the same time as the Generator.</li> 193 * <li>{@code Walker} - A standalone program that start following a linked list and emits timing 194 * info.</li> 195 * <li>{@code Print} - A standalone program that prints nodes in the linked list</li> 196 * <li>{@code Delete} - A standalone program that deletes a single node</li> 197 * </ul> 198 * This class can be run as a unit test, as an integration test, or from the command line 199 * </p> 200 * <p> 201 * ex: 202 * 203 * <pre> 204 * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 205 * loop 2 1 100000 /temp 1 1000 50 1 0 206 * </pre> 207 * </p> 208 */ 209@Category(IntegrationTests.class) 210public class IntegrationTestBigLinkedList extends IntegrationTestBase { 211 protected static final byte[] NO_KEY = new byte[1]; 212 protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table"; 213 protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList"; 214 protected static byte[] FAMILY_NAME = Bytes.toBytes("meta"); 215 private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big"); 216 private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny"); 217 218 // link to the id of the prev node in the linked list 219 protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev"); 220 221 // identifier of the mapred task that generated this row 222 protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client"); 223 224 // the id of the row within the same client. 225 protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count"); 226 227 /** How many rows to write per map task. This has to be a multiple of 25M */ 228 private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY = 229 "IntegrationTestBigLinkedList.generator.num_rows"; 230 231 private static final String GENERATOR_NUM_MAPPERS_KEY = 232 "IntegrationTestBigLinkedList.generator.map.tasks"; 233 234 private static final String GENERATOR_WIDTH_KEY = "IntegrationTestBigLinkedList.generator.width"; 235 236 private static final String GENERATOR_WRAP_KEY = "IntegrationTestBigLinkedList.generator.wrap"; 237 238 private static final String CONCURRENT_WALKER_KEY = 239 "IntegrationTestBigLinkedList.generator.concurrentwalkers"; 240 241 protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster 242 243 private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters 244 245 private static final int WIDTH_DEFAULT = 1000000; 246 247 /** 248 * The 'wrap multipler' default. 249 */ 250 private static final int WRAP_DEFAULT = 25; 251 private static final int ROWKEY_LENGTH = 16; 252 253 private static final int CONCURRENT_WALKER_DEFAULT = 0; 254 255 protected String toRun; 256 protected String[] otherArgs; 257 258 static class CINode { 259 byte[] key; 260 byte[] prev; 261 String client; 262 long count; 263 } 264 265 /** 266 * A Map only job that generates random linked list and stores them. 267 */ 268 static class Generator extends Configured implements Tool { 269 private static final Logger LOG = LoggerFactory.getLogger(Generator.class); 270 271 /** 272 * Set this configuration if you want to test single-column family flush works. If set, we will 273 * add a big column family and a small column family on either side of the usual ITBLL 'meta' 274 * column family. When we write out the ITBLL, we will also add to the big column family a value 275 * bigger than that for ITBLL and for small, something way smaller. The idea is that when 276 * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any 277 * way. Here is how you would pass it: 278 * <p> 279 * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 280 * -Dgenerator.multiple.columnfamilies=true generator 1 10 g 281 */ 282 public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY = 283 "generator.multiple.columnfamilies"; 284 285 /** 286 * Set this configuration if you want to scale up the size of test data quickly. 287 * <p> 288 * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList 289 * -Dgenerator.big.family.value.size=1024 generator 1 10 output 290 */ 291 public static final String BIG_FAMILY_VALUE_SIZE_KEY = "generator.big.family.value.size"; 292 293 public static enum GeneratorCounts { 294 SUCCESS, 295 TERMINATING, 296 UNDEFINED, 297 IOEXCEPTION 298 } 299 300 public static final String USAGE = "Usage : " + Generator.class.getSimpleName() 301 + " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>" 302 + " <num walker threads>] \n" 303 + "Where <num nodes per map> should be a multiple of 'width' * 'wrap multiplier'.\n" 304 + "25M is default because default 'width' is 1M and default 'wrap multiplier' is 25.\n" 305 + "We write out 1M nodes and then flush the client. After 25 flushes, we connect \n" 306 + "first written nodes back to the 25th set.\n" 307 + "Walkers verify random flushed loops during Generation."; 308 309 public Job job; 310 311 static class GeneratorInputFormat extends InputFormat<BytesWritable, NullWritable> { 312 static class GeneratorInputSplit extends InputSplit implements Writable { 313 @Override 314 public long getLength() throws IOException, InterruptedException { 315 return 1; 316 } 317 318 @Override 319 public String[] getLocations() throws IOException, InterruptedException { 320 return new String[0]; 321 } 322 323 @Override 324 public void readFields(DataInput arg0) throws IOException { 325 } 326 327 @Override 328 public void write(DataOutput arg0) throws IOException { 329 } 330 } 331 332 static class GeneratorRecordReader extends RecordReader<BytesWritable, NullWritable> { 333 private long count; 334 private long numNodes; 335 // Use Random64 to avoid issue described in HBASE-21256. 336 private Random64 rand = new Random64(); 337 338 @Override 339 public void close() throws IOException { 340 } 341 342 @Override 343 public BytesWritable getCurrentKey() throws IOException, InterruptedException { 344 byte[] bytes = new byte[ROWKEY_LENGTH]; 345 rand.nextBytes(bytes); 346 return new BytesWritable(bytes); 347 } 348 349 @Override 350 public NullWritable getCurrentValue() throws IOException, InterruptedException { 351 return NullWritable.get(); 352 } 353 354 @Override 355 public float getProgress() throws IOException, InterruptedException { 356 return (float) (count / (double) numNodes); 357 } 358 359 @Override 360 public void initialize(InputSplit arg0, TaskAttemptContext context) 361 throws IOException, InterruptedException { 362 numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000); 363 } 364 365 @Override 366 public boolean nextKeyValue() throws IOException, InterruptedException { 367 return count++ < numNodes; 368 } 369 } 370 371 @Override 372 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 373 TaskAttemptContext context) throws IOException, InterruptedException { 374 GeneratorRecordReader rr = new GeneratorRecordReader(); 375 rr.initialize(split, context); 376 return rr; 377 } 378 379 @Override 380 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 381 int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1); 382 383 ArrayList<InputSplit> splits = new ArrayList<>(numMappers); 384 385 for (int i = 0; i < numMappers; i++) { 386 splits.add(new GeneratorInputSplit()); 387 } 388 389 return splits; 390 } 391 } 392 393 /** Ensure output files from prev-job go to map inputs for current job */ 394 static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> { 395 @Override 396 protected boolean isSplitable(JobContext context, Path filename) { 397 return false; 398 } 399 } 400 401 /** 402 * Some ASCII art time: 403 * <p> 404 * [ . . . ] represents one batch of random longs of length WIDTH 405 * 406 * <pre> 407 * _________________________ 408 * | ______ | 409 * | | || 410 * .-+-----------------+-----.|| 411 * | | | ||| 412 * first = [ . . . . . . . . . . . ] ||| 413 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 414 * | | | | | | | | | | | ||| 415 * prev = [ . . . . . . . . . . . ] ||| 416 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ||| 417 * | | | | | | | | | | | ||| 418 * current = [ . . . . . . . . . . . ] ||| 419 * ||| 420 * ... ||| 421 * ||| 422 * last = [ . . . . . . . . . . . ] ||| 423 * ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____||| 424 * | |________|| 425 * |___________________________| 426 * </pre> 427 */ 428 429 static class GeneratorMapper 430 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 431 432 byte[][] first = null; 433 byte[][] prev = null; 434 byte[][] current = null; 435 byte[] id; 436 long count = 0; 437 int i; 438 BufferedMutator mutator; 439 Connection connection; 440 long numNodes; 441 long wrap; 442 int width; 443 boolean multipleUnevenColumnFamilies; 444 byte[] tinyValue = new byte[] { 't' }; 445 byte[] bigValue = null; 446 Configuration conf; 447 // Use Random64 to avoid issue described in HBASE-21256. 448 private Random64 rand = new Random64(); 449 450 volatile boolean walkersStop; 451 int numWalkers; 452 final Object flushedLoopsLock = new Object(); 453 volatile List<Long> flushedLoops = new ArrayList<>(); 454 List<Thread> walkers = new ArrayList<>(); 455 456 @Override 457 protected void setup(Context context) throws IOException, InterruptedException { 458 id = Bytes.toBytes("Job: " + context.getJobID() + " Task: " + context.getTaskAttemptID()); 459 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 460 instantiateHTable(); 461 this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT); 462 current = new byte[this.width][]; 463 int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT); 464 this.wrap = (long) wrapMultiplier * width; 465 this.numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 466 (long) WIDTH_DEFAULT * WRAP_DEFAULT); 467 if (this.numNodes < this.wrap) { 468 this.wrap = this.numNodes; 469 } 470 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 471 this.numWalkers = 472 context.getConfiguration().getInt(CONCURRENT_WALKER_KEY, CONCURRENT_WALKER_DEFAULT); 473 this.walkersStop = false; 474 this.conf = context.getConfiguration(); 475 476 if (multipleUnevenColumnFamilies) { 477 int n = context.getConfiguration().getInt(BIG_FAMILY_VALUE_SIZE_KEY, 256); 478 int limit = 479 context.getConfiguration().getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 480 ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT); 481 482 Preconditions.checkArgument(n <= limit, "%s(%s) > %s(%s)", BIG_FAMILY_VALUE_SIZE_KEY, n, 483 ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, limit); 484 485 bigValue = new byte[n]; 486 rand.nextBytes(bigValue); 487 LOG.info("Create a bigValue with " + n + " bytes."); 488 } 489 490 Preconditions.checkArgument(numNodes > 0, "numNodes(%s) <= 0", numNodes); 491 Preconditions.checkArgument(numNodes % width == 0, "numNodes(%s) mod width(%s) != 0", 492 numNodes, width); 493 Preconditions.checkArgument(numNodes % wrap == 0, "numNodes(%s) mod wrap(%s) != 0", 494 numNodes, wrap); 495 } 496 497 protected void instantiateHTable() throws IOException { 498 mutator = connection 499 .getBufferedMutator(new BufferedMutatorParams(getTableName(connection.getConfiguration())) 500 .writeBufferSize(4 * 1024 * 1024)); 501 } 502 503 @Override 504 protected void cleanup(Context context) throws IOException, InterruptedException { 505 joinWalkers(); 506 mutator.close(); 507 connection.close(); 508 } 509 510 @Override 511 protected void map(BytesWritable key, NullWritable value, Context output) throws IOException { 512 current[i] = new byte[key.getLength()]; 513 System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength()); 514 if (++i == current.length) { 515 LOG.debug("Persisting current.length={}, count={}, id={}, current={}, i=", current.length, 516 count, Bytes.toStringBinary(id), Bytes.toStringBinary(current[0]), i); 517 persist(output, count, prev, current, id); 518 i = 0; 519 520 if (first == null) { 521 first = current; 522 } 523 prev = current; 524 current = new byte[this.width][]; 525 526 count += current.length; 527 output.setStatus("Count " + count); 528 529 if (count % wrap == 0) { 530 // this block of code turns the 1 million linked list of length 25 into one giant 531 // circular linked list of 25 million 532 circularLeftShift(first); 533 persist(output, -1, prev, first, null); 534 // At this point the entire loop has been flushed so we can add one of its nodes to the 535 // concurrent walker 536 if (numWalkers > 0) { 537 addFlushed(key.getBytes()); 538 if (walkers.isEmpty()) { 539 startWalkers(numWalkers, conf, output); 540 } 541 } 542 first = null; 543 prev = null; 544 } 545 } 546 } 547 548 private static <T> void circularLeftShift(T[] first) { 549 T ez = first[0]; 550 System.arraycopy(first, 1, first, 0, first.length - 1); 551 first[first.length - 1] = ez; 552 } 553 554 private void addFlushed(byte[] rowKey) { 555 synchronized (flushedLoopsLock) { 556 flushedLoops.add(Bytes.toLong(rowKey)); 557 flushedLoopsLock.notifyAll(); 558 } 559 } 560 561 protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id) 562 throws IOException { 563 for (int i = 0; i < current.length; i++) { 564 565 if (i % 100 == 0) { 566 // Tickle progress every so often else maprunner will think us hung 567 output.progress(); 568 } 569 570 Put put = new Put(current[i]); 571 put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]); 572 573 if (count >= 0) { 574 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i)); 575 } 576 if (id != null) { 577 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id); 578 } 579 // See if we are to write multiple columns. 580 if (this.multipleUnevenColumnFamilies) { 581 // Use any column name. 582 put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue); 583 // Use any column name. 584 put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue); 585 } 586 mutator.mutate(put); 587 } 588 589 mutator.flush(); 590 } 591 592 private void startWalkers(int numWalkers, Configuration conf, Context context) { 593 LOG.info("Starting " + numWalkers + " concurrent walkers"); 594 for (int i = 0; i < numWalkers; i++) { 595 Thread walker = new Thread(new ContinuousConcurrentWalker(conf, context)); 596 walker.start(); 597 walkers.add(walker); 598 } 599 } 600 601 private void joinWalkers() { 602 synchronized (flushedLoopsLock) { 603 walkersStop = true; 604 flushedLoopsLock.notifyAll(); 605 } 606 for (Thread walker : walkers) { 607 Uninterruptibles.joinUninterruptibly(walker); 608 } 609 } 610 611 /** 612 * Randomly selects and walks a random flushed loop concurrently with the Generator Mapper by 613 * spawning ConcurrentWalker's with specified StartNodes. These ConcurrentWalker's are 614 * configured to only log erroneous nodes. 615 */ 616 617 public class ContinuousConcurrentWalker implements Runnable { 618 619 ConcurrentWalker walker; 620 Configuration conf; 621 Context context; 622 623 public ContinuousConcurrentWalker(Configuration conf, Context context) { 624 this.conf = conf; 625 this.context = context; 626 } 627 628 @Override 629 public void run() { 630 while (!walkersStop) { 631 try { 632 long node = selectLoop(); 633 try { 634 walkLoop(node); 635 } catch (IOException e) { 636 context.getCounter(GeneratorCounts.IOEXCEPTION).increment(1l); 637 return; 638 } 639 } catch (InterruptedException e) { 640 return; 641 } 642 } 643 } 644 645 private void walkLoop(long node) throws IOException { 646 walker = new ConcurrentWalker(context); 647 walker.setConf(conf); 648 walker.run(node, wrap); 649 } 650 651 private long selectLoop() throws InterruptedException { 652 synchronized (flushedLoopsLock) { 653 while (flushedLoops.isEmpty() && !walkersStop) { 654 flushedLoopsLock.wait(); 655 } 656 if (walkersStop) { 657 throw new InterruptedException(); 658 } 659 return flushedLoops.get(ThreadLocalRandom.current().nextInt(flushedLoops.size())); 660 } 661 } 662 } 663 664 public static class ConcurrentWalker extends WalkerBase { 665 666 Context context; 667 668 public ConcurrentWalker(Context context) { 669 this.context = context; 670 } 671 672 public void run(long startKeyIn, long maxQueriesIn) throws IOException { 673 674 long maxQueries = maxQueriesIn > 0 ? maxQueriesIn : Long.MAX_VALUE; 675 byte[] startKey = Bytes.toBytes(startKeyIn); 676 677 Connection connection = ConnectionFactory.createConnection(getConf()); 678 Table table = connection.getTable(getTableName(getConf())); 679 long numQueries = 0; 680 // If isSpecificStart is set, only walk one list from that particular node. 681 // Note that in case of circular (or P-shaped) list it will walk forever, as is 682 // the case in normal run without startKey. 683 684 CINode node = findStartNode(table, startKey); 685 if (node == null) { 686 LOG.error("Start node not found: " + Bytes.toStringBinary(startKey)); 687 throw new IOException("Start node not found: " + startKeyIn); 688 } 689 while (numQueries < maxQueries) { 690 numQueries++; 691 byte[] prev = node.prev; 692 node = getNode(prev, table, node); 693 if (node == null) { 694 LOG.error("ConcurrentWalker found UNDEFINED NODE: " + Bytes.toStringBinary(prev)); 695 context.getCounter(GeneratorCounts.UNDEFINED).increment(1l); 696 } else if (node.prev.length == NO_KEY.length) { 697 LOG.error( 698 "ConcurrentWalker found TERMINATING NODE: " + Bytes.toStringBinary(node.key)); 699 context.getCounter(GeneratorCounts.TERMINATING).increment(1l); 700 } else { 701 // Increment for successful walk 702 context.getCounter(GeneratorCounts.SUCCESS).increment(1l); 703 } 704 } 705 table.close(); 706 connection.close(); 707 } 708 } 709 } 710 711 @Override 712 public int run(String[] args) throws Exception { 713 if (args.length < 3) { 714 System.err.println(USAGE); 715 return 1; 716 } 717 try { 718 int numMappers = Integer.parseInt(args[0]); 719 long numNodes = Long.parseLong(args[1]); 720 Path tmpOutput = new Path(args[2]); 721 Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]); 722 Integer wrapMultiplier = (args.length < 5) ? null : Integer.parseInt(args[4]); 723 Integer numWalkers = (args.length < 6) ? null : Integer.parseInt(args[5]); 724 return run(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 725 } catch (NumberFormatException e) { 726 System.err.println("Parsing generator arguments failed: " + e.getMessage()); 727 System.err.println(USAGE); 728 return 1; 729 } 730 } 731 732 protected void createSchema() throws IOException { 733 Configuration conf = getConf(); 734 TableName tableName = getTableName(conf); 735 try (Connection conn = ConnectionFactory.createConnection(conf); 736 Admin admin = conn.getAdmin()) { 737 if (!admin.tableExists(tableName)) { 738 HTableDescriptor htd = new HTableDescriptor(getTableName(getConf())); 739 htd.addFamily(new HColumnDescriptor(FAMILY_NAME)); 740 // Always add these families. Just skip writing to them when we do not test per CF flush. 741 htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME)); 742 htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME)); 743 // if -DuseMob=true force all data through mob path. 744 if (conf.getBoolean("useMob", false)) { 745 for (HColumnDescriptor hcd : htd.getColumnFamilies()) { 746 hcd.setMobEnabled(true); 747 hcd.setMobThreshold(4); 748 } 749 } 750 751 // If we want to pre-split compute how many splits. 752 if ( 753 conf.getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY, 754 HBaseTestingUtility.PRESPLIT_TEST_TABLE) 755 ) { 756 int numberOfServers = admin.getRegionServers().size(); 757 if (numberOfServers == 0) { 758 throw new IllegalStateException("No live regionservers"); 759 } 760 int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY, 761 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER); 762 int totalNumberOfRegions = numberOfServers * regionsPerServer; 763 LOG.info("Number of live regionservers: " + numberOfServers + ", " 764 + "pre-splitting table into " + totalNumberOfRegions + " regions " 765 + "(default regions per server: " + regionsPerServer + ")"); 766 767 byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions); 768 769 admin.createTable(htd, splits); 770 } else { 771 // Looks like we're just letting things play out. 772 // Create a table with on region by default. 773 // This will make the splitting work hard. 774 admin.createTable(htd); 775 } 776 } 777 } catch (MasterNotRunningException e) { 778 LOG.error("Master not running", e); 779 throw new IOException(e); 780 } 781 } 782 783 public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width, 784 Integer wrapMultiplier, Integer numWalkers) throws Exception { 785 LOG.info( 786 "Running RandomInputGenerator with numMappers=" + numMappers + ", numNodes=" + numNodes); 787 Job job = Job.getInstance(getConf()); 788 789 job.setJobName("Random Input Generator"); 790 job.setNumReduceTasks(0); 791 job.setJarByClass(getClass()); 792 793 job.setInputFormatClass(GeneratorInputFormat.class); 794 job.setOutputKeyClass(BytesWritable.class); 795 job.setOutputValueClass(NullWritable.class); 796 797 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 798 799 job.setMapperClass(Mapper.class); // identity mapper 800 801 FileOutputFormat.setOutputPath(job, tmpOutput); 802 job.setOutputFormatClass(SequenceFileOutputFormat.class); 803 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Random64.class); 804 805 boolean success = jobCompletion(job); 806 807 return success ? 0 : 1; 808 } 809 810 public int runGenerator(int numMappers, long numNodes, Path tmpOutput, Integer width, 811 Integer wrapMultiplier, Integer numWalkers) throws Exception { 812 LOG.info("Running Generator with numMappers=" + numMappers + ", numNodes=" + numNodes); 813 createSchema(); 814 job = Job.getInstance(getConf()); 815 816 job.setJobName("Link Generator"); 817 job.setNumReduceTasks(0); 818 job.setJarByClass(getClass()); 819 820 FileInputFormat.setInputPaths(job, tmpOutput); 821 job.setInputFormatClass(OneFilePerMapperSFIF.class); 822 job.setOutputKeyClass(NullWritable.class); 823 job.setOutputValueClass(NullWritable.class); 824 825 setJobConf(job, numMappers, numNodes, width, wrapMultiplier, numWalkers); 826 827 setMapperForGenerator(job); 828 829 job.setOutputFormatClass(NullOutputFormat.class); 830 831 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 832 TableMapReduceUtil.addDependencyJars(job); 833 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 834 AbstractHBaseTool.class); 835 TableMapReduceUtil.initCredentials(job); 836 837 boolean success = jobCompletion(job); 838 839 return success ? 0 : 1; 840 } 841 842 protected boolean jobCompletion(Job job) 843 throws IOException, InterruptedException, ClassNotFoundException { 844 boolean success = job.waitForCompletion(true); 845 return success; 846 } 847 848 protected void setMapperForGenerator(Job job) { 849 job.setMapperClass(GeneratorMapper.class); 850 } 851 852 public int run(int numMappers, long numNodes, Path tmpOutput, Integer width, 853 Integer wrapMultiplier, Integer numWalkers) throws Exception { 854 int ret = 855 runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 856 if (ret > 0) { 857 return ret; 858 } 859 return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMultiplier, numWalkers); 860 } 861 862 public boolean verify() { 863 try { 864 Counters counters = job.getCounters(); 865 if (counters == null) { 866 LOG.info("Counters object was null, Generator verification cannot be performed." 867 + " This is commonly a result of insufficient YARN configuration."); 868 return false; 869 } 870 871 if ( 872 counters.findCounter(GeneratorCounts.TERMINATING).getValue() > 0 873 || counters.findCounter(GeneratorCounts.UNDEFINED).getValue() > 0 874 || counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue() > 0 875 ) { 876 LOG.error("Concurrent walker failed to verify during Generation phase"); 877 LOG.error( 878 "TERMINATING nodes: " + counters.findCounter(GeneratorCounts.TERMINATING).getValue()); 879 LOG.error( 880 "UNDEFINED nodes: " + counters.findCounter(GeneratorCounts.UNDEFINED).getValue()); 881 LOG.error( 882 "IOEXCEPTION nodes: " + counters.findCounter(GeneratorCounts.IOEXCEPTION).getValue()); 883 return false; 884 } 885 } catch (IOException e) { 886 LOG.info("Generator verification could not find counter"); 887 return false; 888 } 889 return true; 890 } 891 } 892 893 /** 894 * Tool to search missing rows in WALs and hfiles. Pass in file or dir of keys to search for. Key 895 * file must have been written by Verify step (we depend on the format it writes out. We'll read 896 * them in and then search in hbase WALs and oldWALs dirs (Some of this is TODO). 897 */ 898 static class Search extends Configured implements Tool { 899 private static final Logger LOG = LoggerFactory.getLogger(Search.class); 900 protected Job job; 901 902 private static void printUsage(final String error) { 903 if (error != null && error.length() > 0) System.out.println("ERROR: " + error); 904 System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]"); 905 } 906 907 @Override 908 public int run(String[] args) throws Exception { 909 if (args.length < 1 || args.length > 2) { 910 printUsage(null); 911 return 1; 912 } 913 Path inputDir = new Path(args[0]); 914 int numMappers = 1; 915 if (args.length > 1) { 916 numMappers = Integer.parseInt(args[1]); 917 } 918 return run(inputDir, numMappers); 919 } 920 921 /** 922 * WALPlayer override that searches for keys loaded in the setup. 923 */ 924 public static class WALSearcher extends WALPlayer { 925 public WALSearcher(Configuration conf) { 926 super(conf); 927 } 928 929 /** 930 * The actual searcher mapper. 931 */ 932 public static class WALMapperSearcher extends WALMapper { 933 private SortedSet<byte[]> keysToFind; 934 private AtomicInteger rows = new AtomicInteger(0); 935 936 @Override 937 public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) 938 throws IOException { 939 super.setup(context); 940 try { 941 this.keysToFind = readKeysToSearch(context.getConfiguration()); 942 LOG.info("Loaded keys to find: count=" + this.keysToFind.size()); 943 } catch (InterruptedException e) { 944 throw new InterruptedIOException(e.toString()); 945 } 946 } 947 948 @Override 949 protected boolean filter(Context context, Cell cell) { 950 // TODO: Can I do a better compare than this copying out key? 951 byte[] row = new byte[cell.getRowLength()]; 952 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength()); 953 boolean b = this.keysToFind.contains(row); 954 if (b) { 955 String keyStr = Bytes.toStringBinary(row); 956 try { 957 LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey()); 958 } catch (IOException | InterruptedException e) { 959 LOG.warn(e.toString(), e); 960 } 961 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 962 context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1); 963 } 964 context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1); 965 } 966 return b; 967 } 968 } 969 970 // Put in place the above WALMapperSearcher. 971 @Override 972 public Job createSubmittableJob(String[] args) throws IOException { 973 Job job = super.createSubmittableJob(args); 974 // Call my class instead. 975 job.setJarByClass(WALMapperSearcher.class); 976 job.setMapperClass(WALMapperSearcher.class); 977 job.setOutputFormatClass(NullOutputFormat.class); 978 return job; 979 } 980 } 981 982 static final String FOUND_GROUP_KEY = "Found"; 983 static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir"; 984 985 public int run(Path inputDir, int numMappers) throws Exception { 986 getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString()); 987 SortedSet<byte[]> keys = readKeysToSearch(getConf()); 988 if (keys.isEmpty()) throw new RuntimeException("No keys to find"); 989 LOG.info("Count of keys to find: " + keys.size()); 990 for (byte[] key : keys) 991 LOG.info("Key: " + Bytes.toStringBinary(key)); 992 // Now read all WALs. In two dirs. Presumes certain layout. 993 Path walsDir = 994 new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME); 995 Path oldWalsDir = 996 new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME); 997 LOG.info("Running Search with keys inputDir=" + inputDir + ", numMappers=" + numMappers 998 + " against " + getConf().get(HConstants.HBASE_DIR)); 999 int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()), 1000 new String[] { walsDir.toString(), "" }); 1001 if (ret != 0) { 1002 return ret; 1003 } 1004 return ToolRunner.run(getConf(), new WALSearcher(getConf()), 1005 new String[] { oldWalsDir.toString(), "" }); 1006 } 1007 1008 static SortedSet<byte[]> readKeysToSearch(final Configuration conf) 1009 throws IOException, InterruptedException { 1010 Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY)); 1011 FileSystem fs = FileSystem.get(conf); 1012 SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1013 if (!fs.exists(keysInputDir)) { 1014 throw new FileNotFoundException(keysInputDir.toString()); 1015 } 1016 if (!fs.isDirectory(keysInputDir)) { 1017 throw new UnsupportedOperationException("TODO"); 1018 } else { 1019 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false); 1020 while (iterator.hasNext()) { 1021 LocatedFileStatus keyFileStatus = iterator.next(); 1022 // Skip "_SUCCESS" file. 1023 if (keyFileStatus.getPath().getName().startsWith("_")) continue; 1024 result.addAll(readFileToSearch(conf, keyFileStatus)); 1025 } 1026 } 1027 return result; 1028 } 1029 1030 private static SortedSet<byte[]> readFileToSearch(final Configuration conf, 1031 final LocatedFileStatus keyFileStatus) throws IOException, InterruptedException { 1032 SortedSet<byte[]> result = new TreeSet<>(Bytes.BYTES_COMPARATOR); 1033 // Return entries that are flagged VerifyCounts.UNDEFINED in the value. Return the row. This 1034 // is 1035 // what is missing. 1036 TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); 1037 try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr = 1038 new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) { 1039 InputSplit is = 1040 new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String[] {}); 1041 rr.initialize(is, context); 1042 while (rr.nextKeyValue()) { 1043 rr.getCurrentKey(); 1044 BytesWritable bw = rr.getCurrentValue(); 1045 if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.VerifyCounts.UNDEFINED) { 1046 byte[] key = new byte[rr.getCurrentKey().getLength()]; 1047 System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, 1048 rr.getCurrentKey().getLength()); 1049 result.add(key); 1050 } 1051 } 1052 } 1053 return result; 1054 } 1055 } 1056 1057 /** 1058 * A Map Reduce job that verifies that the linked lists generated by {@link Generator} do not have 1059 * any holes. 1060 */ 1061 static class Verify extends Configured implements Tool { 1062 private static final Logger LOG = LoggerFactory.getLogger(Verify.class); 1063 protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 }); 1064 protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 }); 1065 protected Job job; 1066 1067 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> { 1068 private BytesWritable row = new BytesWritable(); 1069 private BytesWritable ref = new BytesWritable(); 1070 private boolean multipleUnevenColumnFamilies; 1071 1072 @Override 1073 protected void 1074 setup(Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context) 1075 throws IOException, InterruptedException { 1076 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration()); 1077 } 1078 1079 @Override 1080 protected void map(ImmutableBytesWritable key, Result value, Context context) 1081 throws IOException, InterruptedException { 1082 byte[] rowKey = key.get(); 1083 row.set(rowKey, 0, rowKey.length); 1084 if ( 1085 multipleUnevenColumnFamilies && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) 1086 || !value.containsColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME)) 1087 ) { 1088 context.write(row, DEF_LOST_FAMILIES); 1089 } else { 1090 context.write(row, DEF); 1091 } 1092 byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV); 1093 if (prev != null && prev.length > 0) { 1094 ref.set(prev, 0, prev.length); 1095 context.write(ref, row); 1096 } else { 1097 LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey))); 1098 } 1099 } 1100 } 1101 1102 /** 1103 * Don't change the order of these enums. Their ordinals are used as type flag when we emit 1104 * problems found from the reducer. 1105 */ 1106 public static enum VerifyCounts { 1107 UNREFERENCED, 1108 UNDEFINED, 1109 REFERENCED, 1110 CORRUPT, 1111 EXTRAREFERENCES, 1112 EXTRA_UNDEF_REFERENCES, 1113 LOST_FAMILIES 1114 } 1115 1116 /** 1117 * Per reducer, we output problem rows as byte arrays so can be used as input for subsequent 1118 * investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag saying what 1119 * sort of emission it is. Flag is the Count enum ordinal as a short. 1120 */ 1121 public static class VerifyReducer 1122 extends Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> { 1123 private ArrayList<byte[]> refs = new ArrayList<>(); 1124 private final BytesWritable UNREF = 1125 new BytesWritable(addPrefixFlag(VerifyCounts.UNREFERENCED.ordinal(), new byte[] {})); 1126 private final BytesWritable LOSTFAM = 1127 new BytesWritable(addPrefixFlag(VerifyCounts.LOST_FAMILIES.ordinal(), new byte[] {})); 1128 1129 private AtomicInteger rows = new AtomicInteger(0); 1130 private Connection connection; 1131 1132 @Override 1133 protected void 1134 setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1135 throws IOException, InterruptedException { 1136 super.setup(context); 1137 this.connection = ConnectionFactory.createConnection(context.getConfiguration()); 1138 } 1139 1140 @Override 1141 protected void 1142 cleanup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context) 1143 throws IOException, InterruptedException { 1144 if (this.connection != null) { 1145 this.connection.close(); 1146 } 1147 super.cleanup(context); 1148 } 1149 1150 /** 1151 * nn * @return Return new byte array that has <code>ordinal</code> as prefix on front taking 1152 * up Bytes.SIZEOF_SHORT bytes followed by <code>r</code> 1153 */ 1154 public static byte[] addPrefixFlag(final int ordinal, final byte[] r) { 1155 byte[] prefix = Bytes.toBytes((short) ordinal); 1156 if (prefix.length != Bytes.SIZEOF_SHORT) { 1157 throw new RuntimeException("Unexpected size: " + prefix.length); 1158 } 1159 byte[] result = new byte[prefix.length + r.length]; 1160 System.arraycopy(prefix, 0, result, 0, prefix.length); 1161 System.arraycopy(r, 0, result, prefix.length, r.length); 1162 return result; 1163 } 1164 1165 /** 1166 * n * @return Type from the Counts enum of this row. Reads prefix added by 1167 * {@link #addPrefixFlag(int, byte[])} 1168 */ 1169 public static VerifyCounts whichType(final byte[] bs) { 1170 int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT); 1171 return VerifyCounts.values()[ordinal]; 1172 } 1173 1174 /** 1175 * n * @return Row bytes minus the type flag. 1176 */ 1177 public static byte[] getRowOnly(BytesWritable bw) { 1178 byte[] bytes = new byte[bw.getLength() - Bytes.SIZEOF_SHORT]; 1179 System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length); 1180 return bytes; 1181 } 1182 1183 @Override 1184 public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context) 1185 throws IOException, InterruptedException { 1186 int defCount = 0; 1187 boolean lostFamilies = false; 1188 refs.clear(); 1189 for (BytesWritable type : values) { 1190 if (type.getLength() == DEF.getLength()) { 1191 defCount++; 1192 if (type.getBytes()[0] == 1) { 1193 lostFamilies = true; 1194 } 1195 } else { 1196 byte[] bytes = new byte[type.getLength()]; 1197 System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength()); 1198 refs.add(bytes); 1199 } 1200 } 1201 1202 // TODO check for more than one def, should not happen 1203 StringBuilder refsSb = null; 1204 if (defCount == 0 || refs.size() != 1) { 1205 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1206 refsSb = dumpExtraInfoOnRefs(key, context, refs); 1207 LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" 1208 + (refsSb != null ? refsSb.toString() : "")); 1209 } 1210 if (lostFamilies) { 1211 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1212 LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families"); 1213 context.getCounter(VerifyCounts.LOST_FAMILIES).increment(1); 1214 context.write(key, LOSTFAM); 1215 } 1216 1217 if (defCount == 0 && refs.size() > 0) { 1218 // This is bad, found a node that is referenced but not defined. It must have been 1219 // lost, emit some info about this node for debugging purposes. 1220 // Write out a line per reference. If more than one, flag it.; 1221 for (int i = 0; i < refs.size(); i++) { 1222 byte[] bs = refs.get(i); 1223 int ordinal; 1224 if (i <= 0) { 1225 ordinal = VerifyCounts.UNDEFINED.ordinal(); 1226 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1227 context.getCounter(VerifyCounts.UNDEFINED).increment(1); 1228 } else { 1229 ordinal = VerifyCounts.EXTRA_UNDEF_REFERENCES.ordinal(); 1230 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs))); 1231 } 1232 } 1233 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1234 // Print out missing row; doing get on reference gives info on when the referencer 1235 // was added which can help a little debugging. This info is only available in mapper 1236 // output -- the 'Linked List error Key...' log message above. What we emit here is 1237 // useless for debugging. 1238 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1239 context.getCounter("undef", keyString).increment(1); 1240 } 1241 } else if (defCount > 0 && refs.isEmpty()) { 1242 // node is defined but not referenced 1243 context.write(key, UNREF); 1244 context.getCounter(VerifyCounts.UNREFERENCED).increment(1); 1245 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) { 1246 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength()); 1247 context.getCounter("unref", keyString).increment(1); 1248 } 1249 } else { 1250 if (refs.size() > 1) { 1251 // Skip first reference. 1252 for (int i = 1; i < refs.size(); i++) { 1253 context.write(key, new BytesWritable( 1254 addPrefixFlag(VerifyCounts.EXTRAREFERENCES.ordinal(), refs.get(i)))); 1255 } 1256 context.getCounter(VerifyCounts.EXTRAREFERENCES).increment(refs.size() - 1); 1257 } 1258 // node is defined and referenced 1259 context.getCounter(VerifyCounts.REFERENCED).increment(1); 1260 } 1261 } 1262 1263 /** 1264 * Dump out extra info around references if there are any. Helps debugging. 1265 * @return StringBuilder filled with references if any. n 1266 */ 1267 @SuppressWarnings("JavaUtilDate") 1268 private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context, 1269 final List<byte[]> refs) throws IOException { 1270 StringBuilder refsSb = null; 1271 if (refs.isEmpty()) return refsSb; 1272 refsSb = new StringBuilder(); 1273 String comma = ""; 1274 // If a row is a reference but has no define, print the content of the row that has 1275 // this row as a 'prev'; it will help debug. The missing row was written just before 1276 // the row we are dumping out here. 1277 TableName tn = getTableName(context.getConfiguration()); 1278 try (Table t = this.connection.getTable(tn)) { 1279 for (byte[] ref : refs) { 1280 Result r = t.get(new Get(ref)); 1281 List<Cell> cells = r.listCells(); 1282 String ts = (cells != null && !cells.isEmpty()) 1283 ? new java.util.Date(cells.get(0).getTimestamp()).toString() 1284 : ""; 1285 byte[] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT); 1286 String jobStr = (b != null && b.length > 0) ? Bytes.toString(b) : ""; 1287 b = r.getValue(FAMILY_NAME, COLUMN_COUNT); 1288 long count = (b != null && b.length > 0) ? Bytes.toLong(b) : -1; 1289 b = r.getValue(FAMILY_NAME, COLUMN_PREV); 1290 String refRegionLocation = ""; 1291 String keyRegionLocation = ""; 1292 if (b != null && b.length > 0) { 1293 try (RegionLocator rl = this.connection.getRegionLocator(tn)) { 1294 HRegionLocation hrl = rl.getRegionLocation(b); 1295 if (hrl != null) refRegionLocation = hrl.toString(); 1296 // Key here probably has trailing zeros on it. 1297 hrl = rl.getRegionLocation(key.getBytes()); 1298 if (hrl != null) keyRegionLocation = hrl.toString(); 1299 } 1300 } 1301 LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) 1302 + ", refPrevEqualsKey=" 1303 + (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) 1304 + ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) 1305 + ", ref row date=" + ts + ", jobStr=" + jobStr + ", ref row count=" + count 1306 + ", ref row regionLocation=" + refRegionLocation + ", key row regionLocation=" 1307 + keyRegionLocation); 1308 refsSb.append(comma); 1309 comma = ","; 1310 refsSb.append(Bytes.toStringBinary(ref)); 1311 } 1312 } 1313 return refsSb; 1314 } 1315 } 1316 1317 @Override 1318 public int run(String[] args) throws Exception { 1319 if (args.length != 2) { 1320 System.out 1321 .println("Usage : " + Verify.class.getSimpleName() + " <output dir> <num reducers>"); 1322 return 0; 1323 } 1324 1325 String outputDir = args[0]; 1326 int numReducers = Integer.parseInt(args[1]); 1327 1328 return run(outputDir, numReducers); 1329 } 1330 1331 public int run(String outputDir, int numReducers) throws Exception { 1332 return run(new Path(outputDir), numReducers); 1333 } 1334 1335 public int run(Path outputDir, int numReducers) throws Exception { 1336 LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers); 1337 1338 job = Job.getInstance(getConf()); 1339 1340 job.setJobName("Link Verifier"); 1341 job.setNumReduceTasks(numReducers); 1342 job.setJarByClass(getClass()); 1343 1344 setJobScannerConf(job); 1345 1346 Scan scan = new Scan(); 1347 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1348 scan.setCaching(10000); 1349 scan.setCacheBlocks(false); 1350 if (isMultiUnevenColumnFamilies(getConf())) { 1351 scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME); 1352 scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME); 1353 } 1354 1355 TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan, 1356 VerifyMapper.class, BytesWritable.class, BytesWritable.class, job); 1357 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 1358 AbstractHBaseTool.class); 1359 1360 job.getConfiguration().setBoolean("mapreduce.map.speculative", false); 1361 1362 job.setReducerClass(VerifyReducer.class); 1363 job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class); 1364 job.setOutputKeyClass(BytesWritable.class); 1365 job.setOutputValueClass(BytesWritable.class); 1366 TextOutputFormat.setOutputPath(job, outputDir); 1367 1368 boolean success = job.waitForCompletion(true); 1369 1370 if (success) { 1371 Counters counters = job.getCounters(); 1372 if (null == counters) { 1373 LOG.warn("Counters were null, cannot verify Job completion." 1374 + " This is commonly a result of insufficient YARN configuration."); 1375 // We don't have access to the counters to know if we have "bad" counts 1376 return 0; 1377 } 1378 1379 // If we find no unexpected values, the job didn't outright fail 1380 if (verifyUnexpectedValues(counters)) { 1381 // We didn't check referenced+unreferenced counts, leave that to visual inspection 1382 return 0; 1383 } 1384 } 1385 1386 // We failed 1387 return 1; 1388 } 1389 1390 public boolean verify(long expectedReferenced) throws Exception { 1391 if (job == null) { 1392 throw new IllegalStateException("You should call run() first"); 1393 } 1394 1395 Counters counters = job.getCounters(); 1396 if (counters == null) { 1397 LOG.info("Counters object was null, write verification cannot be performed." 1398 + " This is commonly a result of insufficient YARN configuration."); 1399 return false; 1400 } 1401 1402 // Run through each check, even if we fail one early 1403 boolean success = verifyExpectedValues(expectedReferenced, counters); 1404 1405 if (!verifyUnexpectedValues(counters)) { 1406 // We found counter objects which imply failure 1407 success = false; 1408 } 1409 1410 if (!success) { 1411 handleFailure(counters); 1412 } 1413 return success; 1414 } 1415 1416 /** 1417 * Verify the values in the Counters against the expected number of entries written. n * 1418 * Expected number of referenced entrires n * The Job's Counters object 1419 * @return True if the values match what's expected, false otherwise 1420 */ 1421 protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) { 1422 final Counter referenced = counters.findCounter(VerifyCounts.REFERENCED); 1423 final Counter unreferenced = counters.findCounter(VerifyCounts.UNREFERENCED); 1424 boolean success = true; 1425 1426 if (expectedReferenced != referenced.getValue()) { 1427 LOG.error("Expected referenced count does not match with actual referenced count. " 1428 + "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue()); 1429 success = false; 1430 } 1431 1432 if (unreferenced.getValue() > 0) { 1433 final Counter multiref = counters.findCounter(VerifyCounts.EXTRAREFERENCES); 1434 boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue()); 1435 LOG.error( 1436 "Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue() 1437 + (couldBeMultiRef ? "; could be due to duplicate random numbers" : "")); 1438 success = false; 1439 } 1440 1441 return success; 1442 } 1443 1444 /** 1445 * Verify that the Counters don't contain values which indicate an outright failure from the 1446 * Reducers. n * The Job's counters 1447 * @return True if the "bad" counter objects are 0, false otherwise 1448 */ 1449 protected boolean verifyUnexpectedValues(Counters counters) { 1450 final Counter undefined = counters.findCounter(VerifyCounts.UNDEFINED); 1451 final Counter lostfamilies = counters.findCounter(VerifyCounts.LOST_FAMILIES); 1452 boolean success = true; 1453 1454 if (undefined.getValue() > 0) { 1455 LOG.error("Found an undefined node. Undefined count=" + undefined.getValue()); 1456 success = false; 1457 } 1458 1459 if (lostfamilies.getValue() > 0) { 1460 LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue()); 1461 success = false; 1462 } 1463 1464 return success; 1465 } 1466 1467 protected void handleFailure(Counters counters) throws IOException { 1468 Configuration conf = job.getConfiguration(); 1469 TableName tableName = getTableName(conf); 1470 try (Connection conn = ConnectionFactory.createConnection(conf)) { 1471 try (RegionLocator rl = conn.getRegionLocator(tableName)) { 1472 CounterGroup g = counters.getGroup("undef"); 1473 Iterator<Counter> it = g.iterator(); 1474 while (it.hasNext()) { 1475 String keyString = it.next().getName(); 1476 byte[] key = Bytes.toBytes(keyString); 1477 HRegionLocation loc = rl.getRegionLocation(key, true); 1478 LOG.error("undefined row " + keyString + ", " + loc); 1479 } 1480 g = counters.getGroup("unref"); 1481 it = g.iterator(); 1482 while (it.hasNext()) { 1483 String keyString = it.next().getName(); 1484 byte[] key = Bytes.toBytes(keyString); 1485 HRegionLocation loc = rl.getRegionLocation(key, true); 1486 LOG.error("unreferred row " + keyString + ", " + loc); 1487 } 1488 } 1489 } 1490 } 1491 } 1492 1493 /** 1494 * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration 1495 * adds more data. 1496 */ 1497 static class Loop extends Configured implements Tool { 1498 1499 private static final Logger LOG = LoggerFactory.getLogger(Loop.class); 1500 private static final String USAGE = "Usage: Loop <num iterations> <num mappers> " 1501 + "<num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>" 1502 + " <num walker threads>] \n" 1503 + "where <num nodes per map> should be a multiple of width*wrap multiplier, 25M by default \n" 1504 + "walkers will select and verify random flushed loop during Generation."; 1505 1506 IntegrationTestBigLinkedList it; 1507 1508 protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width, 1509 Integer wrapMultiplier, Integer numWalkers) throws Exception { 1510 Path outputPath = new Path(outputDir); 1511 UUID uuid = UUID.randomUUID(); // create a random UUID. 1512 Path generatorOutput = new Path(outputPath, uuid.toString()); 1513 1514 Generator generator = new Generator(); 1515 generator.setConf(getConf()); 1516 int retCode = 1517 generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, numWalkers); 1518 if (retCode > 0) { 1519 throw new RuntimeException("Generator failed with return code: " + retCode); 1520 } 1521 if (numWalkers > 0) { 1522 if (!generator.verify()) { 1523 throw new RuntimeException("Generator.verify failed"); 1524 } 1525 } 1526 LOG.info("Generator finished with success. Total nodes=" + numNodes); 1527 } 1528 1529 protected void runVerify(String outputDir, int numReducers, long expectedNumNodes) 1530 throws Exception { 1531 Path outputPath = new Path(outputDir); 1532 UUID uuid = UUID.randomUUID(); // create a random UUID. 1533 Path iterationOutput = new Path(outputPath, uuid.toString()); 1534 1535 Verify verify = new Verify(); 1536 verify.setConf(getConf()); 1537 int retCode = verify.run(iterationOutput, numReducers); 1538 if (retCode > 0) { 1539 throw new RuntimeException("Verify.run failed with return code: " + retCode); 1540 } 1541 1542 if (!verify.verify(expectedNumNodes)) { 1543 throw new RuntimeException("Verify.verify failed"); 1544 } 1545 LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes); 1546 } 1547 1548 @Override 1549 public int run(String[] args) throws Exception { 1550 if (args.length < 5) { 1551 System.err.println(USAGE); 1552 return 1; 1553 } 1554 try { 1555 int numIterations = Integer.parseInt(args[0]); 1556 int numMappers = Integer.parseInt(args[1]); 1557 long numNodes = Long.parseLong(args[2]); 1558 String outputDir = args[3]; 1559 int numReducers = Integer.parseInt(args[4]); 1560 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]); 1561 Integer wrapMultiplier = (args.length < 7) ? null : Integer.parseInt(args[6]); 1562 Integer numWalkers = (args.length < 8) ? 0 : Integer.parseInt(args[7]); 1563 1564 long expectedNumNodes = 0; 1565 1566 if (numIterations < 0) { 1567 numIterations = Integer.MAX_VALUE; // run indefinitely (kind of) 1568 } 1569 LOG.info("Running Loop with args:" + Arrays.deepToString(args)); 1570 for (int i = 0; i < numIterations; i++) { 1571 LOG.info("Starting iteration = " + i); 1572 runGenerator(numMappers, numNodes, outputDir, width, wrapMultiplier, numWalkers); 1573 expectedNumNodes += numMappers * numNodes; 1574 runVerify(outputDir, numReducers, expectedNumNodes); 1575 } 1576 return 0; 1577 } catch (NumberFormatException e) { 1578 System.err.println("Parsing loop arguments failed: " + e.getMessage()); 1579 System.err.println(USAGE); 1580 return 1; 1581 } 1582 } 1583 } 1584 1585 /** 1586 * A stand alone program that prints out portions of a list created by {@link Generator} 1587 */ 1588 private static class Print extends Configured implements Tool { 1589 @Override 1590 public int run(String[] args) throws Exception { 1591 Options options = new Options(); 1592 options.addOption("s", "start", true, "start key"); 1593 options.addOption("e", "end", true, "end key"); 1594 options.addOption("l", "limit", true, "number to print"); 1595 1596 GnuParser parser = new GnuParser(); 1597 CommandLine cmd = null; 1598 try { 1599 cmd = parser.parse(options, args); 1600 if (cmd.getArgs().length != 0) { 1601 throw new ParseException("Command takes no arguments"); 1602 } 1603 } catch (ParseException e) { 1604 System.err.println("Failed to parse command line " + e.getMessage()); 1605 System.err.println(); 1606 HelpFormatter formatter = new HelpFormatter(); 1607 formatter.printHelp(getClass().getSimpleName(), options); 1608 System.exit(-1); 1609 } 1610 1611 Connection connection = ConnectionFactory.createConnection(getConf()); 1612 Table table = connection.getTable(getTableName(getConf())); 1613 1614 Scan scan = new Scan(); 1615 scan.setBatch(10000); 1616 1617 if (cmd.hasOption("s")) scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s"))); 1618 1619 if (cmd.hasOption("e")) scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e"))); 1620 1621 int limit = 0; 1622 if (cmd.hasOption("l")) limit = Integer.parseInt(cmd.getOptionValue("l")); 1623 else limit = 100; 1624 1625 ResultScanner scanner = table.getScanner(scan); 1626 1627 CINode node = new CINode(); 1628 Result result = scanner.next(); 1629 int count = 0; 1630 while (result != null && count++ < limit) { 1631 node = getCINode(result, node); 1632 System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key), 1633 Bytes.toStringBinary(node.prev), node.count, node.client); 1634 result = scanner.next(); 1635 } 1636 scanner.close(); 1637 table.close(); 1638 connection.close(); 1639 1640 return 0; 1641 } 1642 } 1643 1644 /** 1645 * A stand alone program that deletes a single node. 1646 */ 1647 private static class Delete extends Configured implements Tool { 1648 @Override 1649 public int run(String[] args) throws Exception { 1650 if (args.length != 1) { 1651 System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>"); 1652 return 0; 1653 } 1654 byte[] val = Bytes.toBytesBinary(args[0]); 1655 1656 org.apache.hadoop.hbase.client.Delete delete = new org.apache.hadoop.hbase.client.Delete(val); 1657 1658 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1659 Table table = connection.getTable(getTableName(getConf()))) { 1660 table.delete(delete); 1661 } 1662 1663 System.out.println("Delete successful"); 1664 return 0; 1665 } 1666 } 1667 1668 abstract static class WalkerBase extends Configured { 1669 protected static CINode findStartNode(Table table, byte[] startKey) throws IOException { 1670 Scan scan = new Scan(); 1671 scan.setStartRow(startKey); 1672 scan.setBatch(1); 1673 scan.addColumn(FAMILY_NAME, COLUMN_PREV); 1674 1675 long t1 = EnvironmentEdgeManager.currentTime(); 1676 ResultScanner scanner = table.getScanner(scan); 1677 Result result = scanner.next(); 1678 long t2 = EnvironmentEdgeManager.currentTime(); 1679 scanner.close(); 1680 1681 if (result != null) { 1682 CINode node = getCINode(result, new CINode()); 1683 System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key)); 1684 return node; 1685 } 1686 1687 System.out.println("FSR " + (t2 - t1)); 1688 1689 return null; 1690 } 1691 1692 protected CINode getNode(byte[] row, Table table, CINode node) throws IOException { 1693 Get get = new Get(row); 1694 get.addColumn(FAMILY_NAME, COLUMN_PREV); 1695 Result result = table.get(get); 1696 return getCINode(result, node); 1697 } 1698 } 1699 1700 /** 1701 * A stand alone program that follows a linked list created by {@link Generator} and prints timing 1702 * info. 1703 */ 1704 private static class Walker extends WalkerBase implements Tool { 1705 1706 public Walker() { 1707 } 1708 1709 @Override 1710 public int run(String[] args) throws IOException { 1711 1712 Options options = new Options(); 1713 options.addOption("n", "num", true, "number of queries"); 1714 options.addOption("s", "start", true, "key to start at, binary string"); 1715 options.addOption("l", "logevery", true, "log every N queries"); 1716 1717 GnuParser parser = new GnuParser(); 1718 CommandLine cmd = null; 1719 try { 1720 cmd = parser.parse(options, args); 1721 if (cmd.getArgs().length != 0) { 1722 throw new ParseException("Command takes no arguments"); 1723 } 1724 } catch (ParseException e) { 1725 System.err.println("Failed to parse command line " + e.getMessage()); 1726 System.err.println(); 1727 HelpFormatter formatter = new HelpFormatter(); 1728 formatter.printHelp(getClass().getSimpleName(), options); 1729 System.exit(-1); 1730 } 1731 1732 long maxQueries = Long.MAX_VALUE; 1733 if (cmd.hasOption('n')) { 1734 maxQueries = Long.parseLong(cmd.getOptionValue("n")); 1735 } 1736 boolean isSpecificStart = cmd.hasOption('s'); 1737 1738 byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null; 1739 int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1; 1740 1741 Connection connection = ConnectionFactory.createConnection(getConf()); 1742 Table table = connection.getTable(getTableName(getConf())); 1743 long numQueries = 0; 1744 // If isSpecificStart is set, only walk one list from that particular node. 1745 // Note that in case of circular (or P-shaped) list it will walk forever, as is 1746 // the case in normal run without startKey. 1747 while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) { 1748 if (!isSpecificStart) { 1749 startKey = new byte[ROWKEY_LENGTH]; 1750 Bytes.random(startKey); 1751 } 1752 CINode node = findStartNode(table, startKey); 1753 if (node == null && isSpecificStart) { 1754 System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey)); 1755 } 1756 numQueries++; 1757 while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) { 1758 byte[] prev = node.prev; 1759 long t1 = EnvironmentEdgeManager.currentTime(); 1760 node = getNode(prev, table, node); 1761 long t2 = EnvironmentEdgeManager.currentTime(); 1762 if (logEvery > 0 && numQueries % logEvery == 0) { 1763 System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev)); 1764 } 1765 numQueries++; 1766 if (node == null) { 1767 System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev)); 1768 } else if (node.prev.length == NO_KEY.length) { 1769 System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key)); 1770 } 1771 } 1772 } 1773 table.close(); 1774 connection.close(); 1775 return 0; 1776 } 1777 } 1778 1779 private static class Clean extends Configured implements Tool { 1780 @Override 1781 public int run(String[] args) throws Exception { 1782 if (args.length < 1) { 1783 System.err.println("Usage: Clean <output dir>"); 1784 return -1; 1785 } 1786 1787 Path p = new Path(args[0]); 1788 Configuration conf = getConf(); 1789 TableName tableName = getTableName(conf); 1790 try (FileSystem fs = HFileSystem.get(conf); 1791 Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin()) { 1792 if (admin.tableExists(tableName)) { 1793 admin.disableTable(tableName); 1794 admin.deleteTable(tableName); 1795 } 1796 1797 if (fs.exists(p)) { 1798 fs.delete(p, true); 1799 } 1800 } 1801 1802 return 0; 1803 } 1804 } 1805 1806 static TableName getTableName(Configuration conf) { 1807 return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1808 } 1809 1810 private static CINode getCINode(Result result, CINode node) { 1811 node.key = Bytes.copy(result.getRow()); 1812 if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) { 1813 node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV)); 1814 } else { 1815 node.prev = NO_KEY; 1816 } 1817 if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) { 1818 node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT)); 1819 } else { 1820 node.count = -1; 1821 } 1822 if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) { 1823 node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT)); 1824 } else { 1825 node.client = ""; 1826 } 1827 return node; 1828 } 1829 1830 @Override 1831 public void setUpCluster() throws Exception { 1832 util = getTestingUtil(getConf()); 1833 boolean isDistributed = util.isDistributedCluster(); 1834 util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE); 1835 if (!isDistributed) { 1836 util.startMiniMapReduceCluster(); 1837 } 1838 this.setConf(util.getConfiguration()); 1839 } 1840 1841 @Override 1842 public void cleanUpCluster() throws Exception { 1843 super.cleanUpCluster(); 1844 if (util.isDistributedCluster()) { 1845 util.shutdownMiniMapReduceCluster(); 1846 } 1847 } 1848 1849 private static boolean isMultiUnevenColumnFamilies(Configuration conf) { 1850 return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, true); 1851 } 1852 1853 @Test 1854 public void testContinuousIngest() throws IOException, Exception { 1855 // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> 1856 Configuration conf = getTestingUtil(getConf()).getConfiguration(); 1857 if (isMultiUnevenColumnFamilies(getConf())) { 1858 // make sure per CF flush is on 1859 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, 1860 FlushAllLargeStoresPolicy.class.getName()); 1861 } 1862 int ret = ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000", 1863 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" }); 1864 org.junit.Assert.assertEquals(0, ret); 1865 } 1866 1867 private void usage() { 1868 System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]"); 1869 printCommands(); 1870 } 1871 1872 private void printCommands() { 1873 System.err.println("Commands:"); 1874 System.err.println(" generator Map only job that generates data."); 1875 System.err.println(" verify A map reduce job that looks for holes. Check return code and"); 1876 System.err.println(" look at the counts after running. See REFERENCED and"); 1877 System.err.println(" UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run"); 1878 System.err.println(" with the Generator."); 1879 System.err.println(" walker " 1880 + "Standalone program that starts following a linked list & emits timing info."); 1881 System.err.println(" print Standalone program that prints nodes in the linked list."); 1882 System.err.println(" delete Standalone program that deletes a single node."); 1883 System.err.println(" loop Program to Loop through Generator and Verify steps"); 1884 System.err.println(" clean Program to clean all left over detritus."); 1885 System.err.println(" search Search for missing keys."); 1886 System.err.println(""); 1887 System.err.println("General options:"); 1888 System.err.println(" -D" + TABLE_NAME_KEY + "=<tableName>"); 1889 System.err.println( 1890 " Run using the <tableName> as the tablename. Defaults to " + DEFAULT_TABLE_NAME); 1891 System.err.println(" -D" + HBaseTestingUtility.REGIONS_PER_SERVER_KEY + "=<# regions>"); 1892 System.err.println(" Create table with presplit regions per server. Defaults to " 1893 + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER); 1894 1895 System.err.println(" -DuseMob=<true|false>"); 1896 System.err.println( 1897 " Create table so that the mob read/write path is forced. " + "Defaults to false"); 1898 1899 System.err.flush(); 1900 } 1901 1902 @Override 1903 protected void processOptions(CommandLine cmd) { 1904 super.processOptions(cmd); 1905 String[] args = cmd.getArgs(); 1906 // get the class, run with the conf 1907 if (args.length < 1) { 1908 printUsage(this.getClass().getSimpleName() + " <general options> COMMAND [<COMMAND options>]", 1909 "General options:", ""); 1910 printCommands(); 1911 // Have to throw an exception here to stop the processing. Looks ugly but gets message across. 1912 throw new RuntimeException("Incorrect Number of args."); 1913 } 1914 toRun = args[0]; 1915 otherArgs = Arrays.copyOfRange(args, 1, args.length); 1916 } 1917 1918 @Override 1919 public int runTestFromCommandLine() throws Exception { 1920 Tool tool = null; 1921 if (toRun.equalsIgnoreCase("Generator")) { 1922 tool = new Generator(); 1923 } else if (toRun.equalsIgnoreCase("Verify")) { 1924 tool = new Verify(); 1925 } else if (toRun.equalsIgnoreCase("Loop")) { 1926 Loop loop = new Loop(); 1927 loop.it = this; 1928 tool = loop; 1929 } else if (toRun.equalsIgnoreCase("Walker")) { 1930 tool = new Walker(); 1931 } else if (toRun.equalsIgnoreCase("Print")) { 1932 tool = new Print(); 1933 } else if (toRun.equalsIgnoreCase("Delete")) { 1934 tool = new Delete(); 1935 } else if (toRun.equalsIgnoreCase("Clean")) { 1936 tool = new Clean(); 1937 } else if (toRun.equalsIgnoreCase("Search")) { 1938 tool = new Search(); 1939 } else { 1940 usage(); 1941 throw new RuntimeException("Unknown arg"); 1942 } 1943 1944 return ToolRunner.run(getConf(), tool, otherArgs); 1945 } 1946 1947 @Override 1948 public TableName getTablename() { 1949 Configuration c = getConf(); 1950 return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); 1951 } 1952 1953 @Override 1954 protected Set<String> getColumnFamilies() { 1955 if (isMultiUnevenColumnFamilies(getConf())) { 1956 return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME), 1957 Bytes.toString(TINY_FAMILY_NAME)); 1958 } else { 1959 return Sets.newHashSet(Bytes.toString(FAMILY_NAME)); 1960 } 1961 } 1962 1963 private static void setJobConf(Job job, int numMappers, long numNodes, Integer width, 1964 Integer wrapMultiplier, Integer numWalkers) { 1965 job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers); 1966 job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes); 1967 if (width != null) { 1968 job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width); 1969 } 1970 if (wrapMultiplier != null) { 1971 job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier); 1972 } 1973 if (numWalkers != null) { 1974 job.getConfiguration().setInt(CONCURRENT_WALKER_KEY, numWalkers); 1975 } 1976 } 1977 1978 public static void setJobScannerConf(Job job) { 1979 // Make sure scanners log something useful to make debugging possible. 1980 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true); 1981 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000); 1982 } 1983 1984 public static void main(String[] args) throws Exception { 1985 Configuration conf = HBaseConfiguration.create(); 1986 IntegrationTestingUtility.setUseDistributedCluster(conf); 1987 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args); 1988 System.exit(ret); 1989 } 1990}